Matej Sprysl created KAFKA-16432:
------------------------------------

             Summary: KStreams: Joining KStreams and GlobalKTable requires a 
state store
                 Key: KAFKA-16432
                 URL: https://issues.apache.org/jira/browse/KAFKA-16432
             Project: Kafka
          Issue Type: Bug
            Reporter: Matej Sprysl


h2.  Code:
{code:java}
final StreamsBuilder builder = new StreamsBuilder();

final GlobalKTable<Long, Long> table = builder.globalTable("tableTopic", 
Consumed.with(Serdes.Long(), Serdes.Long()));

final KStream<byte[], Message> stream =
    builder.stream(
        Pattern.compile("streamTopic"),
        Consumed.with(Serdes.ByteArray(), Message.SERDE));

(...some processing, no state store added...)

final var joiner = new MyJoiner();
final var keyMapper = new MyKeyMapper();

final KStream<byte[], Message> enriched =
    messages
        .join(table, keyMapper, joiner, Named.as("innerJoin"));{code}
h2. Error:
{code:java}
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
innerJoin has no access to StateStore tableTopic-STATE-STORE-0000000000 as the 
store is not connected to the processor. If you add stores manually via 
'.addStateStore()' make sure to connect the added store to the processor by 
providing the processor name to '.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. DSL users need to provide the store name 
to '.process()', '.transform()', or '.transformValues()' to connect the store 
to the corresponding operator, or they can provide a StoreBuilder by 
implementing the stores() method on the Supplier itself. If you do not add 
stores manually, please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.
{code}
h2. Additional notes:

This error happens when I try to join a KStreams instance with a GlobalKTable 
instance.

It is important to emphasize that I am not connecting any state store manually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to