[ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6398:
---------------------------------
    Description: 
For any operator that generates a KTable, its {{valueGetterSupplier}} has three 
code path:

1. If the operator is a KTable source operator, using its materialized state 
store for value getter (note that currently we always materialize on KTable 
source).
2. If the operator is an aggregation operator, then its generated KTable should 
always be materialized so we just use its materialized state store.
3. Otherwise, we treat the value getter in a per-operator basis.

For 3) above, what we SHOULD do is that, if the generated KTable is 
materialized, the value getter would just rely on its materialized state store 
to get the value; otherwise we just rely on the operator itself to define which 
parent's value getter to inherit and what computational logic to apply 
on-the-fly to get the value. For example, for {{KTable#filter()}} where the 
{{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get 
from parent's value getter and then apply the filter on the fly; and in 
addition we should let the future operators to be able to access its parent's 
materialized state store via {{connectProcessorAndStateStore}}.

However, current code does not do this correctly: it 1) does not check if the 
result KTable is materialized or not, but always try to use its parent's value 
getter, and 2) it does not try to connect its parent's materialized store to 
the future operator. As a result, these operators such as {{KTable#filter}}, 
{{KTable#mapValues}}, and {{KTable#join(KTable)}} would result in 
TopologyException when building. The following is an example:

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Using a non-materialized KTable in a stream-table join fails:

{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(...);
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

fails with
{noformat}
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: StateStore null is not added yet.

        at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
        at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
{noformat}

Adding a store name is not sufficient as workaround but fails differently:
{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(..., 
"STORE-NAME");
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

error:
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor KSTREAM-JOIN-0000000005

        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
topology building: Processor KSTREAM-JOIN-0000000005 has no access to 
StateStore KTABLE-SOURCE-STATE-STORE-0000000000
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
        at 
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
        at 
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
        at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
{noformat}

One can workaround by piping the result through a topic:
{noformat}
final KTable filteredKTable = 
builder.table("table-topic").filter(...).through("TOPIC");;
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

------------------------------------------------------------------------------------------------------------


Note that there is another minor orthogonal issue of {{KTable#filter}} itself 
that 

  was:
Using a non-materialized KTable in a stream-table join fails:

{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(...);
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

fails with
{noformat}
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: StateStore null is not added yet.

        at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
        at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
{noformat}

Adding a store name is not sufficient as workaround but fails differently:
{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(..., 
"STORE-NAME");
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

error:
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor KSTREAM-JOIN-0000000005

        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
topology building: Processor KSTREAM-JOIN-0000000005 has no access to 
StateStore KTABLE-SOURCE-STATE-STORE-0000000000
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
        at 
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
        at 
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
        at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
{noformat}

One can workaround by piping the result through a topic:
{noformat}
final KTable filteredKTable = 
builder.table("table-topic").filter(...).through("TOPIC");;
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}


> Non-aggregation KTable generation operator does not construct value getter 
> correctly
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bug
>
> For any operator that generates a KTable, its {{valueGetterSupplier}} has 
> three code path:
> 1. If the operator is a KTable source operator, using its materialized state 
> store for value getter (note that currently we always materialize on KTable 
> source).
> 2. If the operator is an aggregation operator, then its generated KTable 
> should always be materialized so we just use its materialized state store.
> 3. Otherwise, we treat the value getter in a per-operator basis.
> For 3) above, what we SHOULD do is that, if the generated KTable is 
> materialized, the value getter would just rely on its materialized state 
> store to get the value; otherwise we just rely on the operator itself to 
> define which parent's value getter to inherit and what computational logic to 
> apply on-the-fly to get the value. For example, for {{KTable#filter()}} where 
> the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just 
> get from parent's value getter and then apply the filter on the fly; and in 
> addition we should let the future operators to be able to access its parent's 
> materialized state store via {{connectProcessorAndStateStore}}.
> However, current code does not do this correctly: it 1) does not check if the 
> result KTable is materialized or not, but always try to use its parent's 
> value getter, and 2) it does not try to connect its parent's materialized 
> store to the future operator. As a result, these operators such as 
> {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would 
> result in TopologyException when building. The following is an example:
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>       at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-0000000005
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-0000000005 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-0000000000
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = 
> builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> ------------------------------------------------------------------------------------------------------------
> Note that there is another minor orthogonal issue of {{KTable#filter}} itself 
> that 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to