[ https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437620#comment-17437620 ]
Anatoly Tsyganenko commented on KAFKA-13373: -------------------------------------------- And as I said error popped in init() on this line "this.store = context.getStateStore(storeName);". Posted supplier was the same during testing. I deleted only close() function. I've tried but also can't reproduce anymore. Sorry :(. Will update if will have found any additional information. > ValueTransformerWithKeySupplier doesn't work with store() > --------------------------------------------------------- > > Key: KAFKA-13373 > URL: https://issues.apache.org/jira/browse/KAFKA-13373 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0 > Reporter: Anatoly Tsyganenko > Assignee: Aleksandr Sorokoumov > Priority: Minor > Labels: newbie > > I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like > this: > > {code:java} > public final class CustomSupplier implements > ValueTransformerWithKeySupplier<Windowed<String>, JsonNode, JsonNode> { > private final String storeName = "my-store"; > public Set<StoreBuilder<?>> stores() { > final Deserializer<JsonNode> jsonDeserializer = new > JsonDeserializer(); > final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); > final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, > jsonDeserializer); > final Serde<String> stringSerde = Serdes.String(); > final StoreBuilder<TimestampedKeyValueStore<String, JsonNode>> store > = > Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), > stringSerde, jsonSerde).withLoggingDisabled(); > return Collections.singleton(store); > } > @Override > public ValueTransformerWithKey<Windowed<String>, JsonNode, JsonNode> > get() { > return new ValueTransformerWithKey<Windowed<String>, JsonNode, > JsonNode>() { > private ProcessorContext context; > private TimestampedKeyValueStore<String, JsonNode> store; > @Override > public void init(final ProcessorContext context) { > this.store = context.getStateStore(storeName); > this.context = context; > } > //.... > }{code} > > But got next error for line "this.store = context.getStateStore(storeName);" > in init(): > {code:java} > Caused by: org.apache.kafka.streams.errors.StreamsException: Processor > KTABLE-TRANSFORMVALUES-0000000008 has no access to StateStore my-store as the > store is not connected to the processor. If you add stores manually via > '.addStateStore()' make sure to connect the added store to the processor by > providing the processor name to '.addStateStore()' or connect them via > '.connectProcessorAndStateStores()'. DSL users need to provide the store name > to '.process()', '.transform()', or '.transformValues()' to connect the store > to the corresponding operator, or they can provide a StoreBuilder by > implementing the stores() method on the Supplier itself. If you do not add > stores manually, please file a bug report at > https://issues.apache.org/jira/projects/KAFKA.{code} > > The same code works perfect with Transform or when I adding store to builder. > Looks like something wrong when ConnectedStoreProvider and > ValueTransformerWithKeySupplier used together. > -- This message was sent by Atlassian Jira (v8.3.4#803005)