[ 
https://issues.apache.org/jira/browse/KAFKA-16432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matej Sprysl resolved KAFKA-16432.
----------------------------------
    Resolution: Fixed

Fixed by using the same StreamsBuilder for both streams.

> KStreams: Joining KStreams and GlobalKTable requires a state store
> ------------------------------------------------------------------
>
>                 Key: KAFKA-16432
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16432
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Matej Sprysl
>            Priority: Major
>
> h2.  Code:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final GlobalKTable<Long, Long> table = builder.globalTable("tableTopic", 
> Consumed.with(Serdes.Long(), Serdes.Long()));
> final KStream<byte[], Message> stream =
>     builder.stream(
>         Pattern.compile("streamTopic"),
>         Consumed.with(Serdes.ByteArray(), Message.SERDE));
> (...some processing, no state store added...)
> final var joiner = new MyJoiner();
> final var keyMapper = new MyKeyMapper();
> final KStream<byte[], Message> enriched =
>     messages
>         .join(table, keyMapper, joiner, Named.as("innerJoin"));{code}
> h2. Error:
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> innerJoin has no access to StateStore tableTopic-STATE-STORE-0000000000 as 
> the store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the corresponding operator, or they can provide a StoreBuilder by 
> implementing the stores() method on the Supplier itself. If you do not add 
> stores manually, please file a bug report at 
> https://issues.apache.org/jira/projects/KAFKA.
> {code}
> h2. Additional notes:
> This error happens when I try to join a KStreams instance with a GlobalKTable 
> instance.
> It is important to emphasize that I am not connecting any state store 
> manually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to