[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-6398. ---------------------------------- Resolution: Fixed Fix Version/s: 1.0.1 1.1.0 > Non-aggregation KTable generation operator does not construct value getter > correctly > ------------------------------------------------------------------------------------ > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.1, 1.0.0 > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > Priority: Critical > Labels: bug > Fix For: 1.1.0, 1.0.1 > > > For any operator that generates a KTable, its {{valueGetterSupplier}} has > three code path: > 1. If the operator is a KTable source operator, using its materialized state > store for value getter (note that currently we always materialize on KTable > source). > 2. If the operator is an aggregation operator, then its generated KTable > should always be materialized so we just use its materialized state store. > 3. Otherwise, we treat the value getter in a per-operator basis. > For 3) above, what we SHOULD do is that, if the generated KTable is > materialized, the value getter would just rely on its materialized state > store to get the value; otherwise we just rely on the operator itself to > define which parent's value getter to inherit and what computational logic to > apply on-the-fly to get the value. For example, for {{KTable#filter()}} where > the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just > get from parent's value getter and then apply the filter on the fly; and in > addition we should let the future operators to be able to access its parent's > materialized state store via {{connectProcessorAndStateStore}}. > However, current code does not do this correctly: it 1) does not check if the > result KTable is materialized or not, but always try to use its parent's > value getter, and 2) it does not try to connect its parent's materialized > store to the future operator. As a result, these operators such as > {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would > result in TopologyException when building. The following is an example: > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology > building: StateStore null is not added yet. > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., > "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor KSTREAM-JOIN-0000000005 > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at > org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at > org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology building: Processor KSTREAM-JOIN-0000000005 has no access to > StateStore KTABLE-SOURCE-STATE-STORE-0000000000 > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) > {noformat} > One can workaround by piping the result through a topic: > {noformat} > final KTable filteredKTable = > builder.table("table-topic").filter(...).through("TOPIC");; > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > ------------------------------------------------------------------------------------------------------------ > Note that there is another minor orthogonal issue of {{KTable#filter}} itself > that it does not include its parent's queryable store name when itself is not > materialized (see {{KTable#mapValues}} for reference). -- This message was sent by Atlassian JIRA (v7.6.3#76005)