[ 
https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia updated KAFKA-13373:
---------------------------------
    Labels: newbie  (was: )

> ValueTransformerWithKeySupplier doesn't work with store()
> ---------------------------------------------------------
>
>                 Key: KAFKA-13373
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13373
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Anatoly Tsyganenko
>            Priority: Minor
>              Labels: newbie
>
> I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like 
> this:
>  
> {code:java}
> public final class CustomSupplier implements 
> ValueTransformerWithKeySupplier<Windowed<String>, JsonNode, JsonNode> {
> private final String storeName = "my-store";
>     public Set<StoreBuilder<?>> stores() {
>         final Deserializer<JsonNode> jsonDeserializer = new 
> JsonDeserializer();
>         final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
>         final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, 
> jsonDeserializer);
>         final Serde<String> stringSerde = Serdes.String();
>         final StoreBuilder<TimestampedKeyValueStore<String, JsonNode>> store 
> = 
> Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
>                         stringSerde, jsonSerde).withLoggingDisabled();
>         return Collections.singleton(store);
>     }
>     @Override
>     public ValueTransformerWithKey<Windowed<String>, JsonNode, JsonNode> 
> get() {
>         return new ValueTransformerWithKey<Windowed<String>, JsonNode, 
> JsonNode>() {
>             private ProcessorContext context;
>             private TimestampedKeyValueStore<String, JsonNode> store;
>             @Override
>             public void init(final ProcessorContext context) {
>                 this.store = context.getStateStore(storeName);
>                 this.context = context;
>             }
> //....
> }{code}
>  
> But got next error for line "this.store = context.getStateStore(storeName);" 
> in init():
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> KTABLE-TRANSFORMVALUES-0000000008 has no access to StateStore my-store as the 
> store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the corresponding operator, or they can provide a StoreBuilder by 
> implementing the stores() method on the Supplier itself. If you do not add 
> stores manually, please file a bug report at 
> https://issues.apache.org/jira/projects/KAFKA.{code}
>  
> The same code works perfect with Transform or when I adding store to builder. 
> Looks like something wrong when ConnectedStoreProvider and 
> ValueTransformerWithKeySupplier used together.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to