[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17361062#comment-17361062
 ] 

Michael Viamari commented on KAFKA-12925:
-----------------------------------------

I can flesh out a larger example if needed, but the basic usage for me was 
getting a reference to the state store using {{context.getStateStore()}} inside 
{{Transformer#init}}, and then when attempting to use 
{{TimestampedKeyValueStore#prefixScan}}, the exception was thrown.
{code:java}
public class TransformerPrefixScan<K, V, KS, VS> implements Transformer<K, V, 
KeyValue<K, V>> {

        private ProcessorContext context;
        private TimestampedKeyValueStore<KS, VS> lookupStore;

        public TransformerPrefixScan() {}

        @Override
        @SuppressWarnings("unchecked")
        public void init(ProcessorContext context) {
            this.context = context;
            lookupStore = context.getStateStore(lookupStoreName);
        }

        @Override
        public KeyValue<K, V> transform(K key, V value) {

            String keyPrefix = extractPrefix(key);
            try (KeyValueIterator<KS, ValueAndTimestamp<VS>> lookupIterator = 
lookupStore.prefixScan(keyPrefix, Serdes.String())) {
                //....handle results....
            }

            return null;
        }

        @Override
        public void close() {

        }
}
{code}

> prefixScan missing from intermediate interfaces
> -----------------------------------------------
>
>                 Key: KAFKA-12925
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12925
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Michael Viamari
>            Assignee: Sagar Rao
>            Priority: Major
>             Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to