[ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6398.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.0.1
                   1.1.0

> Non-aggregation KTable generation operator does not construct value getter 
> correctly
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bug
>             Fix For: 1.1.0, 1.0.1
>
>
> For any operator that generates a KTable, its {{valueGetterSupplier}} has 
> three code path:
> 1. If the operator is a KTable source operator, using its materialized state 
> store for value getter (note that currently we always materialize on KTable 
> source).
> 2. If the operator is an aggregation operator, then its generated KTable 
> should always be materialized so we just use its materialized state store.
> 3. Otherwise, we treat the value getter in a per-operator basis.
> For 3) above, what we SHOULD do is that, if the generated KTable is 
> materialized, the value getter would just rely on its materialized state 
> store to get the value; otherwise we just rely on the operator itself to 
> define which parent's value getter to inherit and what computational logic to 
> apply on-the-fly to get the value. For example, for {{KTable#filter()}} where 
> the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just 
> get from parent's value getter and then apply the filter on the fly; and in 
> addition we should let the future operators to be able to access its parent's 
> materialized state store via {{connectProcessorAndStateStore}}.
> However, current code does not do this correctly: it 1) does not check if the 
> result KTable is materialized or not, but always try to use its parent's 
> value getter, and 2) it does not try to connect its parent's materialized 
> store to the future operator. As a result, these operators such as 
> {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would 
> result in TopologyException when building. The following is an example:
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-0000000005
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-0000000005 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-0000000000
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = 
> builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> ------------------------------------------------------------------------------------------------------------
> Note that there is another minor orthogonal issue of {{KTable#filter}} itself 
> that it does not include its parent's queryable store name when itself is not 
> materialized (see {{KTable#mapValues}} for reference).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to