Repository: kafka Updated Branches: refs/heads/trunk 716330a5b -> 3dcbbf703
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 66ec0d7..1abc5e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -166,7 +166,7 @@ public interface KTable<K, V> { * (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}. * @return a {@code KTable} that contains only those records that satisfy the given predicate * @see #filterNot(Predicate, Materialized) - * @deprecated use {@link #filter(Predicate, Materialized)} + * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(queryableStoreName))} */ @Deprecated KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName); @@ -203,7 +203,7 @@ public interface KTable<K, V> { * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains only those records that satisfy the given predicate * @see #filterNot(Predicate, Materialized) - * @deprecated use {@link #filter(Predicate, Materialized)} + * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(KeyValueByteStoreSupplier))} */ @Deprecated KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -297,7 +297,7 @@ public interface KTable<K, V> { * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate * @see #filter(Predicate, Materialized) - * @deprecated use {@link #filterNot(Predicate, Materialized)} + * @deprecated use {@link #filterNot(Predicate, Materialized) filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))} */ @Deprecated KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -336,7 +336,7 @@ public interface KTable<K, V> { * (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}. * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate * @see #filter(Predicate, Materialized) - * @deprecated use {@link #filter(Predicate, Materialized)} + * @deprecated use {@link #filter(Predicate, Materialized) filterNot(predicate, Materialized.as(queryableStoreName))} */ @Deprecated KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName); @@ -463,7 +463,7 @@ public interface KTable<K, V> { * @param <VR> the value type of the result {@code KTable} * * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) - * @deprecated use {@link #mapValues(ValueMapper, Materialized)} + * @deprecated use {@link #mapValues(ValueMapper, Materialized) mapValues(mapper, Materialized.as(queryableStoreName).withValueSerde(valueSerde))} */ @Deprecated <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName); @@ -507,7 +507,7 @@ public interface KTable<K, V> { * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @param <VR> the value type of the result {@code KTable} * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) - * @deprecated use {@link #mapValues(ValueMapper, Materialized)} + * @deprecated use {@link #mapValues(ValueMapper, Materialized) mapValues(mapper, Materialized.as(KeyValueByteStoreSupplier).withValueSerde(valueSerde))} */ @Deprecated <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, @@ -530,7 +530,8 @@ public interface KTable<K, V> { * update record. * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#print()} on the result. + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#print(Printed) print(Printed.toSysOut())} on the result. */ @Deprecated void print(); @@ -551,7 +552,8 @@ public interface KTable<K, V> { * @param label the name used to label the key/value pairs printed to the console * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#print(String)} on the result. + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(lable))} on the result. */ @Deprecated void print(final String label); @@ -574,7 +576,8 @@ public interface KTable<K, V> { * @param valSerde value serde used to deserialize value if type is {@code byte[]} * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde)} on the result. + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#print(Printed) print(Printed.toSysOut().withKeyValueMapper(...)} on the result. */ @Deprecated void print(final Serde<K> keySerde, @@ -598,7 +601,8 @@ public interface KTable<K, V> { * @param label the name used to label the key/value pairs printed to the console * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde, String)} on the result. + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)} on the result. */ @Deprecated void print(final Serde<K> keySerde, @@ -622,7 +626,8 @@ public interface KTable<K, V> { * @param filePath name of file to write to * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String)}} on the result. + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#print(Printed) print(Printed.toFile(filePath)} on the result. */ @Deprecated void writeAsText(final String filePath); @@ -644,7 +649,8 @@ public interface KTable<K, V> { * @param label the name used to label the key/value pairs printed out to the console * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String)}} on the result. + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label)} on the result. */ @Deprecated void writeAsText(final String filePath, @@ -669,7 +675,8 @@ public interface KTable<K, V> { * @param valSerde value serde used to deserialize value if type is {@code byte[]} * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, Serde, Serde)}} on the result. + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(...)} on the result. */ @Deprecated void writeAsText(final String filePath, @@ -695,8 +702,8 @@ public interface KTable<K, V> { * @param valSerde value serde used to deserialize value if type is {@code byte[]} * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String, Serde, Serde)}} on the result. - + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)} on the result. */ @Deprecated void writeAsText(final String filePath, @@ -714,7 +721,8 @@ public interface KTable<K, V> { * @param action an action to perform on each record * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively - * convert to a KStream using {@code toStream()} and then use {@link KStream#foreach(ForeachAction)}} on the result. + * convert to a {@link KStream} using {@link #toStream()} and then use + * {@link KStream#foreach(ForeachAction) foreach(action)} on the result. */ @Deprecated void foreach(final ForeachAction<? super K, ? super V> action); @@ -763,18 +771,19 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(String) #to(someTopicName)} and - * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, Materialized)}) + * {@link KStreamBuilder#table(String, String)}) * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param topic the topic name * @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII - * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()} + * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))} + * to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final String topic, @@ -787,17 +796,18 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(String) #to(someTopicName)} and - * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, Materialized)}) + * {@link KStreamBuilder#table(String, String)}) * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param topic the topic name * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and + * and {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))} + * to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final String topic, @@ -810,15 +820,15 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(String) #to(someTopicName)} and - * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}. + * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf. - * {@link StreamsBuilder#table(String)}) + * {@link KStreamBuilder#table(String)}) * * @param topic the topic name * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and + * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final String topic); @@ -831,17 +841,18 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}. + * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf. - * {@link StreamsBuilder#table(String)}) + * {@link KStreamBuilder#table(String)}) * * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and + * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, @@ -855,10 +866,10 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, Materialized)}) + * {@link KStreamBuilder#table(String, String)}) * * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used @@ -866,8 +877,10 @@ public interface KTable<K, V> { * @param queryableStoreName the state store name used for the result {@code KTable}. * If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))} + * to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, @@ -882,18 +895,20 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, Materialized)}) + * {@link KStreamBuilder#table(String, String)}) * * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)} + * to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, @@ -909,10 +924,10 @@ public interface KTable<K, V> { * used—otherwise producer's {@link DefaultPartitioner} is used. * <p> * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and - * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link KStreamBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, Materialized)}) + * {@link KStreamBuilder#table(String, String)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -920,13 +935,16 @@ public interface KTable<K, V> { * if not specified the default value serde defined in the configuration will be used * @param topic the topic name * @param queryableStoreName the state store name used for the result {@code KTable}. - * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)()} + * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))} + * to read back as a {@code KTable} */ @Deprecated - KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, + KTable<K, V> through(final Serde<K> keySerde, + final Serde<V> valSerde, final String topic, final String queryableStoreName); @@ -939,7 +957,7 @@ public interface KTable<K, V> { * used—otherwise producer's {@link DefaultPartitioner} is used. * <p> * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and - * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. * {@link StreamsBuilder#table(String, Materialized)}) @@ -951,11 +969,14 @@ public interface KTable<K, V> { * @param topic the topic name * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)} + * to read back as a {@code KTable} */ @Deprecated - KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, + KTable<K, V> through(final Serde<K> keySerde, + final Serde<V> valSerde, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -968,10 +989,10 @@ public interface KTable<K, V> { * used—otherwise producer's {@link DefaultPartitioner} is used. * <p> * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and - * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}. + * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf. - * {@link StreamsBuilder#table(String)}) + * {@link KStreamBuilder#table(String)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -979,11 +1000,13 @@ public interface KTable<K, V> { * if not specified the default value serde defined in the configuration will be used * @param topic the topic name * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} + * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable} */ @Deprecated - KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, + KTable<K, V> through(final Serde<K> keySerde, + final Serde<V> valSerde, final String topic); /** @@ -994,10 +1017,10 @@ public interface KTable<K, V> { * <p> * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) * #to(keySerde, valueSerde, partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, Materialized)}) + * {@link KStreamBuilder#table(String, String)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -1011,8 +1034,10 @@ public interface KTable<K, V> { * @param queryableStoreName the state store name used for the result {@code KTable}. * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))} + * to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final Serde<K> keySerde, @@ -1029,10 +1054,10 @@ public interface KTable<K, V> { * <p> * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) * #to(keySerde, valueSerde, partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, Materialized)}) + * {@link KStreamBuilder#table(String, String)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -1045,8 +1070,10 @@ public interface KTable<K, V> { * @param topic the topic name * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))} + * to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final Serde<K> keySerde, @@ -1063,10 +1090,10 @@ public interface KTable<K, V> { * <p> * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) * #to(keySerde, valueSerde, partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}. + * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf. - * {@link StreamsBuilder#table(String)}) + * {@link KStreamBuilder#table(String)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -1078,8 +1105,9 @@ public interface KTable<K, V> { * be used * @param topic the topic name * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} - * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and + * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable} */ @Deprecated KTable<K, V> through(final Serde<K> keySerde, @@ -1094,7 +1122,7 @@ public interface KTable<K, V> { * started). * * @param topic the topic name - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} */ @Deprecated void to(final String topic); @@ -1108,7 +1136,8 @@ public interface KTable<K, V> { * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.withStreamPartitioner(partitioner)} */ @Deprecated void to(final StreamPartitioner<? super K, ? super V> partitioner, @@ -1127,7 +1156,8 @@ public interface KTable<K, V> { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde)} */ @Deprecated void to(final Serde<K> keySerde, @@ -1149,7 +1179,8 @@ public interface KTable<K, V> { * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will * be used * @param topic the topic name - * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * @deprecated use {@link #toStream()} followed by + * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partioner)} */ @Deprecated void to(final Serde<K> keySerde, @@ -1243,7 +1274,7 @@ public interface KTable<K, V> { * @param <KR> the key type of the result {@link KGroupedTable} * @param <VR> the value type of the result {@link KGroupedTable} * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable} - * @deprecated use {@link #groupBy(KeyValueMapper, Serialized)} + * @deprecated use {@link #groupBy(KeyValueMapper, Serialized) groupBy(selector, Serialized.with(keySerde, valueSerde)} */ @Deprecated <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, @@ -1475,7 +1506,7 @@ public interface KTable<K, V> { * {@link ValueJoiner}, one for each matched record-pair with the same key * @see #leftJoin(KTable, ValueJoiner, Materialized) * @see #outerJoin(KTable, ValueJoiner, Materialized) - * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)} + * @deprecated use {@link #join(KTable, ValueJoiner, Materialized) join(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)} */ @Deprecated <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, @@ -1554,7 +1585,7 @@ public interface KTable<K, V> { * {@link ValueJoiner}, one for each matched record-pair with the same key * @see #leftJoin(KTable, ValueJoiner, Materialized) * @see #outerJoin(KTable, ValueJoiner, Materialized) - * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)} + * @deprecated use {@link #join(KTable, ValueJoiner, Materialized) join(other, joiner, Materialized.as(KeyValueByteStoreSupplier)} */ @Deprecated <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, @@ -1811,7 +1842,7 @@ public interface KTable<K, V> { * left {@code KTable} * @see #join(KTable, ValueJoiner, Materialized) * @see #outerJoin(KTable, ValueJoiner, Materialized) - * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)} + * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized) leftJoin(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)} */ @Deprecated <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, @@ -1898,7 +1929,7 @@ public interface KTable<K, V> { * left {@code KTable} * @see #join(KTable, ValueJoiner, Materialized) * @see #outerJoin(KTable, ValueJoiner, Materialized) - * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)} + * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized) leftJoin(other, joiner, Materialized.as(KeyValueByteStoreSupplier)} */ @Deprecated <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, @@ -2153,7 +2184,7 @@ public interface KTable<K, V> { * both {@code KTable}s * @see #join(KTable, ValueJoiner, Materialized) * @see #leftJoin(KTable, ValueJoiner, Materialized) - * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)} + * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized) outerJoin(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)} */ @Deprecated <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, @@ -2239,7 +2270,7 @@ public interface KTable<K, V> { * both {@code KTable}s * @see #join(KTable, ValueJoiner) * @see #leftJoin(KTable, ValueJoiner) - * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)} + * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized) outerJoin(other, joiner, Materialized.as(KeyValueByteStoreSupplier)} */ @Deprecated <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java index d3db087..8d2c22a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java @@ -105,7 +105,6 @@ public class Printed<K, V> { * The example below shows how to customize output data. * <pre>{@code * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { - * @Override * public String apply(Integer key, String value) { * return String.format("(%d, %s)", key, value); * } http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java index e7cc234..1acd587 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java @@ -28,12 +28,12 @@ import org.apache.kafka.streams.KeyValue; * {@code Reducer} can be used to implement aggregation functions like sum, min, or max. * * @param <V> value type - * @see KGroupedStream#reduce(Reducer, String) - * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#reduce(Reducer, Windows, String) - * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#reduce(Reducer, SessionWindows, String) - * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier) + * @see KGroupedStream#reduce(Reducer) + * @see KGroupedStream#reduce(Reducer, Materialized) + * @see TimeWindowedKStream#reduce(Reducer) + * @see TimeWindowedKStream#reduce(Reducer, Materialized) + * @see SessionWindowedKStream#reduce(Reducer) + * @see SessionWindowedKStream#reduce(Reducer, Materialized) * @see Aggregator */ public interface Reducer<V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java index 3c3ef7e..ddc0371 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java @@ -127,13 +127,13 @@ public interface SessionWindowedKStream<K, V> { * @param initializer the instance of {@link Initializer} * @param aggregator the instance of {@link Aggregator} * @param sessionMerger the instance of {@link Merger} - * @param <T> the value type of the resulting {@link KTable} + * @param <VR> the value type of the resulting {@link KTable} * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window */ - <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, - final Aggregator<? super K, ? super V, T> aggregator, - final Merger<? super K, T> sessionMerger); + <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> aggregator, + final Merger<? super K, VR> sessionMerger); /** * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}. @@ -222,10 +222,10 @@ public interface SessionWindowedKStream<K, V> { * <pre>{@code * // At the example of a Reducer<Long> * new Reducer<Long>() { - * @Override * public Long apply(Long aggValue, Long currValue) { * return aggValue + currValue; * } + * } * }</pre> * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's @@ -260,8 +260,8 @@ public interface SessionWindowedKStream<K, V> { * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. - * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. - * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null} + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param materializedAs an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null} * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window */ http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index 6e06461..693bee0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -58,12 +58,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; * @see TimeWindows * @see UnlimitedWindows * @see JoinWindows - * @see KGroupedStream#count(SessionWindows, String) - * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#reduce(Reducer, SessionWindows, String) - * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier) + * @see KGroupedStream#windowedBy(SessionWindows) * @see TimestampExtractor */ public final class SessionWindows { http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index 38362ad..f9090c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -45,12 +45,7 @@ import java.util.Map; * @see SessionWindows * @see UnlimitedWindows * @see JoinWindows - * @see KGroupedStream#count(Windows, String) - * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#reduce(Reducer, Windows, String) - * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) + * @see KGroupedStream#windowedBy(Windows) * @see TimestampExtractor */ public final class TimeWindows extends Windows<TimeWindow> { http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index e116a8b..a3b9338 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -34,12 +34,7 @@ import java.util.Map; * @see TimeWindows * @see SessionWindows * @see JoinWindows - * @see KGroupedStream#count(Windows, String) - * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#reduce(Reducer, Windows, String) - * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) + * @see KGroupedStream#windowedBy(Windows) * @see TimestampExtractor */ public final class UnlimitedWindows extends Windows<UnlimitedWindow> { http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java index 576706e..7728317 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java @@ -26,18 +26,8 @@ package org.apache.kafka.streams.kstream; * Thus, a windowed {@link KTable} has type {@code <Windowed<K>,V>}. * * @param <K> type of the key - * @see KGroupedStream#count(Windows, String) - * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#count(SessionWindows, String) - * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#reduce(Reducer, Windows, String) - * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#reduce(Reducer, SessionWindows, String) - * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier) - * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String) + * @see KGroupedStream#windowedBy(Windows) + * @see KGroupedStream#windowedBy(SessionWindows) */ public class Windowed<K> { http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java index e6bc112..4d07f9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java @@ -33,7 +33,7 @@ public class QueryableStoreTypes { * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore} * @param <K> key type of the store * @param <V> value type of the store - * @return {@link KeyValueStoreType} + * @return {@link QueryableStoreTypes.KeyValueStoreType} */ public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() { return new KeyValueStoreType<>(); @@ -43,7 +43,7 @@ public class QueryableStoreTypes { * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore} * @param <K> key type of the store * @param <V> value type of the store - * @return {@link WindowStoreType} + * @return {@link QueryableStoreTypes.WindowStoreType} */ public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() { return new WindowStoreType<>(); @@ -53,7 +53,7 @@ public class QueryableStoreTypes { * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore} * @param <K> key type of the store * @param <V> value type of the store - * @return {@link SessionStoreType} + * @return {@link QueryableStoreTypes.SessionStoreType} */ public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() { return new SessionStoreType<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index c0beb9e..05ebd33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -190,7 +190,7 @@ public class Stores { * * @param name the name of the store * @return the factory that can be used to specify other options or configurations for the store; never null - * @deprected use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)} + * @deprecated use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)} * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, int)}, or {@link #inMemoryKeyValueStore(String)} */ @Deprecated