Anatoly Tsyganenko created KAFKA-13373:
------------------------------------------

             Summary: ValueTransformerWithKeySupplier doesn't work with store()
                 Key: KAFKA-13373
                 URL: https://issues.apache.org/jira/browse/KAFKA-13373
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.8.0
            Reporter: Anatoly Tsyganenko


I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like 
this:

 
{code:java}
public final class CustomSupplier implements 
ValueTransformerWithKeySupplier<Windowed<String>, JsonNode, JsonNode> {

private final String storeName = "my-store";

    public Set<StoreBuilder<?>> stores() {
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, 
jsonDeserializer);
        final Serde<String> stringSerde = Serdes.String();
        final StoreBuilder<TimestampedKeyValueStore<String, JsonNode>> store = 
Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
                        stringSerde, jsonSerde).withLoggingDisabled();
        return Collections.singleton(store);
    }

    @Override
    public ValueTransformerWithKey<Windowed<String>, JsonNode, JsonNode> get() {
        return new ValueTransformerWithKey<Windowed<String>, JsonNode, 
JsonNode>() {
            private ProcessorContext context;
            private TimestampedKeyValueStore<String, JsonNode> store;

            @Override
            public void init(final ProcessorContext context) {
                this.store = context.getStateStore(storeName);
                this.context = context;
            }
//....
}{code}
 

But got next error for line "this.store = context.getStateStore(storeName);" in 
init():
{code:java}
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
KTABLE-TRANSFORMVALUES-0000000008 has no access to StateStore my-store as the 
store is not connected to the processor. If you add stores manually via 
'.addStateStore()' make sure to connect the added store to the processor by 
providing the processor name to '.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. DSL users need to provide the store name 
to '.process()', '.transform()', or '.transformValues()' to connect the store 
to the corresponding operator, or they can provide a StoreBuilder by 
implementing the stores() method on the Supplier itself. If you do not add 
stores manually, please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.{code}
 

The same code works perfect with Transform or when I adding store to builder. 
Looks like something wrong when ConnectedStoreProvider and 
ValueTransformerWithKeySupplier used together.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to