[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-6398: --------------------------------- Summary: Non-aggregation KTable generation operator does not construct value getter correctly (was: Stream-Table join fails, if table is not materialized) > Non-aggregation KTable generation operator does not construct value getter > correctly > ------------------------------------------------------------------------------------ > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.1, 1.0.0 > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > Labels: bug > > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology > building: StateStore null is not added yet. > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., > "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor KSTREAM-JOIN-0000000005 > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at > org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at > org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology building: Processor KSTREAM-JOIN-0000000005 has no access to > StateStore KTABLE-SOURCE-STATE-STORE-0000000000 > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) > {noformat} > One can workaround by piping the result through a topic: > {noformat} > final KTable filteredKTable = > builder.table("table-topic").filter(...).through("TOPIC");; > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)