[ https://issues.apache.org/jira/browse/KAFKA-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16149020#comment-16149020 ]
ASF GitHub Bot commented on KAFKA-5650: --------------------------------------- GitHub user dguy opened a pull request: https://github.com/apache/kafka/pull/3767 KAFKA-5650: add StateStoreBuilder interface and implementations Part of KIP-182 - Add `StateStoreBuilder` interface and `WindowStateStoreBuilder`, `KeyValueStateStoreBuilder`, and `SessionStateStoreBuilder` implementations - Add `StoreSupplier`, `WindowBytesStoreSupplier`, `KeyValueBytesStoreSupplier`, `SessionBytesStoreSupplier` interfaces and implementations - Add new methods to `Stores` to create the newly added `StoreSupplier` and `StateStoreBuilder` implementations - Update `Topology` and `InternalTopology` to use the interfaces You can merge this pull request into a Git repository by running: $ git pull https://github.com/dguy/kafka kafka-5650 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3767.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3767 ---- commit ee543690317581dc09d3d3f06d52df0685501960 Author: Damian Guy <damian....@gmail.com> Date: 2017-08-25T16:10:35Z add StateStoreBuilder interface and implementations ---- > Provide a simple way for custom storage engines to use streams wrapped stores > (KIP-182) > --------------------------------------------------------------------------------------- > > Key: KAFKA-5650 > URL: https://issues.apache.org/jira/browse/KAFKA-5650 > Project: Kafka > Issue Type: Sub-task > Reporter: Damian Guy > Assignee: Damian Guy > > As per KIP-182: > A new interface will be added: > {code} > /** > * Implementations of this will provide the ability to wrap a given StateStore > * with or without caching/loggging etc. > */ > public interface StateStoreBuilder<T extends StateStore> { > > StateStoreBuilder<T> withCachingEnabled(); > StateStoreBuilder<T> withCachingDisabled(); > StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config); > StateStoreBuilder<T> withLoggingDisabled(); > T build(); > } > {code} > This interface will be used to wrap stores with caching, logging etc. > Additionally some convenience methods on the {{Stores}} class: > {code} > public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> > persistentKeyValueStore(final String name, > > final Serde<K> keySerde, > > final Serde<V> valueSerde) > > public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> > inMemoryKeyValueStore(final String name, > > final Serde<K> keySerde, > > final Serde<V> valueSerde) > > public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final > String name, > final int > capacity, > final > Serde<K> keySerde, > final > Serde<V> valueSerde) > > public static <K, V> StateStoreSupplier<WindowStore<K, V>> > persistentWindowStore(final String name, > > final Windows windows, > > final Serde<K> keySerde, > > final Serde<V> valueSerde) > > public static <K, V> StateStoreSupplier<SessionStore<K, V>> > persistentSessionStore(final String name, > > final SessionWindows windows, > > final Serde<K> keySerde, > > final Serde<V> valueSerde) > > /** > * The following methods are for use with the PAPI. They allow building of > StateStores that can be wrapped with > * caching, logging, and any other convenient wrappers provided by the > KafkaStreams library > */ > public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final > StateStoreSupplier<WindowStore<K, V>> supplier) > > public <K, V> StateStoreBuilder<KeyValueStore<K, V>> > keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier) > > public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final > StateStoreSupplier<SessionStore<K, V>> supplier) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)