Repository: kafka
Updated Branches:
  refs/heads/trunk 716330a5b -> 3dcbbf703


http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 66ec0d7..1abc5e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -166,7 +166,7 @@ public interface KTable<K, V> {
      *                          (i.e., that would be equivalent to calling 
{@link KTable#filter(Predicate)}.
      * @return a {@code KTable} that contains only those records that satisfy 
the given predicate
      * @see #filterNot(Predicate, Materialized)
-     * @deprecated use {@link #filter(Predicate, Materialized)}
+     * @deprecated use {@link #filter(Predicate, Materialized) 
filter(predicate, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final 
String queryableStoreName);
@@ -203,7 +203,7 @@ public interface KTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be 
{@code null}.
      * @return a {@code KTable} that contains only those records that satisfy 
the given predicate
      * @see #filterNot(Predicate, Materialized)
-     * @deprecated use {@link #filter(Predicate, Materialized)}
+     * @deprecated use {@link #filter(Predicate, Materialized) 
filter(predicate, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final 
StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -297,7 +297,7 @@ public interface KTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be 
{@code null}.
      * @return a {@code KTable} that contains only those records that do 
<em>not</em> satisfy the given predicate
      * @see #filter(Predicate, Materialized)
-     * @deprecated use {@link #filterNot(Predicate, Materialized)}
+     * @deprecated use {@link #filterNot(Predicate, Materialized) 
filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, 
final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -336,7 +336,7 @@ public interface KTable<K, V> {
      * (i.e., that would be equivalent to calling {@link 
KTable#filterNot(Predicate)}.
      * @return a {@code KTable} that contains only those records that do 
<em>not</em> satisfy the given predicate
      * @see #filter(Predicate, Materialized)
-     * @deprecated use {@link #filter(Predicate, Materialized)}
+     * @deprecated use {@link #filter(Predicate, Materialized) 
filterNot(predicate, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, 
final String queryableStoreName);
@@ -463,7 +463,7 @@ public interface KTable<K, V> {
      * @param <VR>   the value type of the result {@code KTable}
      *
      * @return a {@code KTable} that contains records with unmodified keys and 
new values (possibly of different type)
-     * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
+     * @deprecated use {@link #mapValues(ValueMapper, Materialized) 
mapValues(mapper, 
Materialized.as(queryableStoreName).withValueSerde(valueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> 
mapper, final Serde<VR> valueSerde, final String queryableStoreName);
@@ -507,7 +507,7 @@ public interface KTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be 
{@code null}.
      * @param <VR>   the value type of the result {@code KTable}
      * @return a {@code KTable} that contains records with unmodified keys and 
new values (possibly of different type)
-     * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
+     * @deprecated use {@link #mapValues(ValueMapper, Materialized) 
mapValues(mapper, 
Materialized.as(KeyValueByteStoreSupplier).withValueSerde(valueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> 
mapper,
@@ -530,7 +530,8 @@ public interface KTable<K, V> {
      * update record.
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#print()} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toSysOut())} on the result.
      */
     @Deprecated
     void print();
@@ -551,7 +552,8 @@ public interface KTable<K, V> {
      * @param label the name used to label the key/value pairs printed to the 
console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#print(String)} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) 
print(Printed.toSysOut().withLabel(lable))} on the result.
      */
     @Deprecated
     void print(final String label);
@@ -574,7 +576,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to deserialize value if type is {@code 
byte[]}
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#print(Serde, Serde)} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) 
print(Printed.toSysOut().withKeyValueMapper(...)} on the result.
      */
     @Deprecated
     void print(final Serde<K> keySerde,
@@ -598,7 +601,8 @@ public interface KTable<K, V> {
      * @param label the name used to label the key/value pairs printed to the 
console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#print(Serde, Serde, String)} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) 
print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)} on the 
result.
      */
     @Deprecated
     void print(final Serde<K> keySerde,
@@ -622,7 +626,8 @@ public interface KTable<K, V> {
      * @param filePath name of file to write to
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#writeAsText(String)}} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toFile(filePath)} on the 
result.
      */
     @Deprecated
     void writeAsText(final String filePath);
@@ -644,7 +649,8 @@ public interface KTable<K, V> {
      * @param label the name used to label the key/value pairs printed out to 
the console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#writeAsText(String, String)}} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) 
print(Printed.toFile(filePath).withLabel(label)} on the result.
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -669,7 +675,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to deserialize value if type is {@code 
byte[]}
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#writeAsText(String, Serde, Serde)}} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) 
print(Printed.toFile(filePath).withKeyValueMapper(...)} on the result.
      */
     @Deprecated
     void  writeAsText(final String filePath,
@@ -695,8 +702,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to deserialize value if type is {@code 
byte[]}
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#writeAsText(String, String, Serde, Serde)}} on the result.
-
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) 
print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)} on the 
result.
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -714,7 +721,8 @@ public interface KTable<K, V> {
      * @param action an action to perform on each record
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link 
KStream#foreach(ForeachAction)}} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#foreach(ForeachAction) foreach(action)} on the result.
      */
     @Deprecated
     void foreach(final ForeachAction<? super K, ? super V> action);
@@ -763,18 +771,19 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) 
KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      * The store name must be a valid Kafka topic name and cannot contain 
characters other than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
      * @param queryableStoreName the state store name used for the result 
{@code KTable}; valid characters are ASCII
-     *                  alphanumerics, '.', '_' and '-'. If {@code null} this 
is the equivalent of {@link KTable#through(String)()}
+     *                  alphanumerics, '.', '_' and '-'. If {@code null} this 
is the equivalent of {@link KTable#through(String)}
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String) to(topic)} and
+     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final String topic,
@@ -787,17 +796,18 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) 
KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      * The store name must be a valid Kafka topic name and cannot contain 
characters other than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
      * @param storeSupplier user defined state store supplier. Cannot be 
{@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String) to(topic)} and
+     * and {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final String topic,
@@ -810,15 +820,15 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String) 
StreamsBuilder#table(someTopicName)}.
+     * {@link KStreamBuilder#table(String) 
KStreamBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with an internal store name (cf.
-     * {@link StreamsBuilder#table(String)})
+     * {@link KStreamBuilder#table(String)})
      *
      * @param topic     the topic name
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String) to(topic)} and
+     * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to 
read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final String topic);
@@ -831,17 +841,18 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) 
#to(partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String) 
StreamsBuilder#table(someTopicName)}.
+     * {@link KStreamBuilder#table(String) 
KStreamBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with an internal store name (cf.
-     * {@link StreamsBuilder#table(String)})
+     * {@link KStreamBuilder#table(String)})
      *
      * @param partitioner the function used to determine how records are 
distributed among partitions of the topic,
      *                    if not specified producer's {@link 
DefaultPartitioner} will be used
      * @param topic       the topic name
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, 
Produced.streamPartitioner(partitioner))} and
+     * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to 
read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> 
partitioner,
@@ -855,10 +866,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) 
#to(partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) 
KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param partitioner the function used to determine how records are 
distributed among partitions of the topic,
      *                    if not specified producer's {@link 
DefaultPartitioner} will be used
@@ -866,8 +877,10 @@ public interface KTable<K, V> {
      * @param queryableStoreName   the state store name used for the result 
{@code KTable}.
      *                             If {@code null} this is the equivalent of 
{@link KTable#through(StreamPartitioner, String)}
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, 
Produced.streamPartitioner(partitioner))} and
+     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> 
partitioner,
@@ -882,18 +895,20 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) 
#to(partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) 
KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param partitioner the function used to determine how records are 
distributed among partitions of the topic,
      *                    if not specified producer's {@link 
DefaultPartitioner} will be used
      * @param topic       the topic name
      * @param storeSupplier user defined state store supplier. Cannot be 
{@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, 
Produced.streamPartitioner(partitioner))} and
+     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> 
partitioner,
@@ -909,10 +924,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) 
#to(keySerde, valueSerde, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the 
configuration will be used
@@ -920,13 +935,16 @@ public interface KTable<K, V> {
      *                  if not specified the default value serde defined in 
the configuration will be used
      * @param topic     the topic name
      * @param queryableStoreName the state store name used for the result 
{@code KTable}.
-     *                           If {@code null} this is the equivalent of 
{@link KTable#through(Serde, Serde, String)()}
+     *                           If {@code null} this is the equivalent of 
{@link KTable#through(Serde, Serde, String)}
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, 
valSerde))} and
+     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
                          final String topic,
                          final String queryableStoreName);
 
@@ -939,7 +957,7 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) 
#to(keySerde, valueSerde, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) 
KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with the given store name (cf.
      * {@link StreamsBuilder#table(String, Materialized)})
@@ -951,11 +969,14 @@ public interface KTable<K, V> {
      * @param topic     the topic name
      * @param storeSupplier user defined state store supplier. Cannot be 
{@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, 
valSerde))} and
+     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
+     * to read back as a {@code KTable}
      */
     @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
                          final String topic,
                          final StateStoreSupplier<KeyValueStore> 
storeSupplier);
 
@@ -968,10 +989,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) 
#to(keySerde, valueSerde, someTopicName)} and
-     * {@link StreamsBuilder#table(String) 
StreamsBuilder#table(someTopicName)}.
+     * {@link KStreamBuilder#table(String) 
KStreamBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with an interna; store name (cf.
-     * {@link StreamsBuilder#table(String)})
+     * {@link KStreamBuilder#table(String)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the 
configuration will be used
@@ -979,11 +1000,13 @@ public interface KTable<K, V> {
      *                  if not specified the default value serde defined in 
the configuration will be used
      * @param topic     the topic name
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, 
valSerde))}
+     * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to 
read back as a {@code KTable}
      */
     @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
                          final String topic);
 
     /**
@@ -994,10 +1017,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, 
StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) 
KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in 
the configuration will be used
@@ -1011,8 +1034,10 @@ public interface KTable<K, V> {
      * @param queryableStoreName  the state store name used for the result 
{@code KTable}.
      *                            If {@code null} this is the equivalent of 
{@link KTable#through(Serde, Serde, StreamPartitioner, String)()}
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, 
valSerde, partitioner))} and
+     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
@@ -1029,10 +1054,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, 
StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) 
KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in 
the configuration will be used
@@ -1045,8 +1070,10 @@ public interface KTable<K, V> {
      * @param topic      the topic name
      * @param storeSupplier user defined state store supplier. Cannot be 
{@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, 
valSerde, partitioner))} and
+     * {@link StreamsBuilder#table(String, Materialized) 
StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
@@ -1063,10 +1090,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, 
StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String) 
StreamsBuilder#table(someTopicName)}.
+     * {@link KStreamBuilder#table(String) 
KStreamBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state 
store with an internal store name (cf.
-     * {@link StreamsBuilder#table(String)})
+     * {@link KStreamBuilder#table(String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in 
the configuration will be used
@@ -1078,8 +1105,9 @@ public interface KTable<K, V> {
      *                    be used
      * @param topic      the topic name
      * @return a {@code KTable} that contains the exact same (and potentially 
repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code 
KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, 
valSerde, partitioner))} and
+     * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to 
read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
@@ -1094,7 +1122,7 @@ public interface KTable<K, V> {
      * started).
      *
      * @param topic the topic name
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String)}
+     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String) to(topic)}
      */
     @Deprecated
     void to(final String topic);
@@ -1108,7 +1136,8 @@ public interface KTable<K, V> {
      * @param partitioner the function used to determine how records are 
distributed among partitions of the topic,
      *                    if not specified producer's {@link 
DefaultPartitioner} will be used
      * @param topic       the topic name
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, 
Produced.withStreamPartitioner(partitioner)}
      */
     @Deprecated
     void to(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -1127,7 +1156,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default value serde defined in the 
configuration will be used
      * @param topic    the topic name
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, 
valSerde)}
      */
     @Deprecated
     void to(final Serde<K> keySerde,
@@ -1149,7 +1179,8 @@ public interface KTable<K, V> {
      *                    {@link WindowedStreamPartitioner} will be 
used&mdash;otherwise {@link DefaultPartitioner} will
      *                    be used
      * @param topic      the topic name
-     * @deprecated use {@link #toStream()} followed by {@link 
KStream#to(String, Produced)}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, 
valSerde, partioner)}
      */
     @Deprecated
     void to(final Serde<K> keySerde,
@@ -1243,7 +1274,7 @@ public interface KTable<K, V> {
      * @param <KR>       the key type of the result {@link KGroupedTable}
      * @param <VR>       the value type of the result {@link KGroupedTable}
      * @return a {@link KGroupedTable} that contains the re-grouped records of 
the original {@code KTable}
-     * @deprecated use {@link #groupBy(KeyValueMapper, Serialized)}
+     * @deprecated use {@link #groupBy(KeyValueMapper, Serialized) 
groupBy(selector, Serialized.with(keySerde, valueSerde)}
      */
     @Deprecated
     <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? 
super V, KeyValue<KR, VR>> selector,
@@ -1475,7 +1506,7 @@ public interface KTable<K, V> {
      * {@link ValueJoiner}, one for each matched record-pair with the same key
      * @see #leftJoin(KTable, ValueJoiner, Materialized)
      * @see #outerJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized) 
join(other, joiner, 
Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
@@ -1554,7 +1585,7 @@ public interface KTable<K, V> {
      * {@link ValueJoiner}, one for each matched record-pair with the same key
      * @see #leftJoin(KTable, ValueJoiner, Materialized)
      * @see #outerJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized) 
join(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
@@ -1811,7 +1842,7 @@ public interface KTable<K, V> {
      * left {@code KTable}
      * @see #join(KTable, ValueJoiner, Materialized)
      * @see #outerJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized) 
leftJoin(other, joiner, 
Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
@@ -1898,7 +1929,7 @@ public interface KTable<K, V> {
      * left {@code KTable}
      * @see #join(KTable, ValueJoiner, Materialized)
      * @see #outerJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized) 
leftJoin(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
@@ -2153,7 +2184,7 @@ public interface KTable<K, V> {
      * both {@code KTable}s
      * @see #join(KTable, ValueJoiner, Materialized)
      * @see #leftJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized) 
outerJoin(other, joiner, 
Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
@@ -2239,7 +2270,7 @@ public interface KTable<K, V> {
      * both {@code KTable}s
      * @see #join(KTable, ValueJoiner)
      * @see #leftJoin(KTable, ValueJoiner)
-     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized) 
outerJoin(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
index d3db087..8d2c22a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -105,7 +105,6 @@ public class Printed<K, V> {
      * The example below shows how to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index e7cc234..1acd587 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -28,12 +28,12 @@ import org.apache.kafka.streams.KeyValue;
  * {@code Reducer} can be used to implement aggregation functions like sum, 
min, or max.
  *
  * @param <V> value type
- * @see KGroupedStream#reduce(Reducer, String)
- * @see KGroupedStream#reduce(Reducer, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer)
+ * @see KGroupedStream#reduce(Reducer, Materialized)
+ * @see TimeWindowedKStream#reduce(Reducer)
+ * @see TimeWindowedKStream#reduce(Reducer, Materialized)
+ * @see SessionWindowedKStream#reduce(Reducer)
+ * @see SessionWindowedKStream#reduce(Reducer, Materialized)
  * @see Aggregator
  */
 public interface Reducer<V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 3c3ef7e..ddc0371 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -127,13 +127,13 @@ public interface SessionWindowedKStream<K, V> {
      * @param initializer    the instance of {@link Initializer}
      * @param aggregator     the instance of {@link Aggregator}
      * @param sessionMerger  the instance of {@link Merger}
-     * @param <T>           the value type of the resulting {@link KTable}
+     * @param <VR>            the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with 
unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
-    <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
-                                         final Aggregator<? super K, ? super 
V, T> aggregator,
-                                         final Merger<? super K, T> 
sessionMerger);
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                           final Aggregator<? super K, ? super 
V, VR> aggregator,
+                                           final Merger<? super K, VR> 
sessionMerger);
 
     /**
      * Aggregate the values of records in this stream by the grouped key and 
defined {@link SessionWindows}.
@@ -222,10 +222,10 @@ public interface SessionWindowedKStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
@@ -260,8 +260,8 @@ public interface SessionWindowedKStream<K, V> {
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"queryableStoreName" is the
      * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
-     * @param reducer           a {@link Reducer} that computes a new 
aggregate result. Cannot be {@code null}.
-     * @param materialized     an instance of {@link Materialized} used to 
materialize a state store. Cannot be {@code null}
+     * @param reducer a {@link Reducer} that computes a new aggregate result. 
Cannot be {@code null}.
+     * @param materializedAs an instance of {@link Materialized} used to 
materialize a state store. Cannot be {@code null}
      * @return a windowed {@link KTable} that contains "update" records with 
unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 6e06461..693bee0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -58,12 +58,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
  * @see TimeWindows
  * @see UnlimitedWindows
  * @see JoinWindows
- * @see KGroupedStream#count(SessionWindows, String)
- * @see KGroupedStream#count(SessionWindows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, 
SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, 
SessionWindows, org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(SessionWindows)
  * @see TimestampExtractor
  */
 public final class SessionWindows {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 38362ad..f9090c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -45,12 +45,7 @@ import java.util.Map;
  * @see SessionWindows
  * @see UnlimitedWindows
  * @see JoinWindows
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, 
org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(Windows)
  * @see TimestampExtractor
  */
 public final class TimeWindows extends Windows<TimeWindow> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index e116a8b..a3b9338 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -34,12 +34,7 @@ import java.util.Map;
  * @see TimeWindows
  * @see SessionWindows
  * @see JoinWindows
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, 
org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(Windows)
  * @see TimestampExtractor
  */
 public final class UnlimitedWindows extends Windows<UnlimitedWindow> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 576706e..7728317 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -26,18 +26,8 @@ package org.apache.kafka.streams.kstream;
  * Thus, a windowed {@link KTable} has type {@code <Windowed<K>,V>}.
  *
  * @param <K> type of the key
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#count(SessionWindows, String)
- * @see KGroupedStream#count(SessionWindows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, 
org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, 
SessionWindows, org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, 
SessionWindows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#windowedBy(Windows)
+ * @see KGroupedStream#windowedBy(SessionWindows)
  */
 public class Windowed<K> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java 
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index e6bc112..4d07f9b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -33,7 +33,7 @@ public class QueryableStoreTypes {
      * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}
      * @param <K>   key type of the store
      * @param <V>   value type of the store
-     * @return  {@link KeyValueStoreType}
+     * @return  {@link QueryableStoreTypes.KeyValueStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> 
keyValueStore() {
         return new KeyValueStoreType<>();
@@ -43,7 +43,7 @@ public class QueryableStoreTypes {
      * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}
      * @param <K>   key type of the store
      * @param <V>   value type of the store
-     * @return  {@link WindowStoreType}
+     * @return  {@link QueryableStoreTypes.WindowStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> 
windowStore() {
         return new WindowStoreType<>();
@@ -53,7 +53,7 @@ public class QueryableStoreTypes {
      * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
      * @param <K>   key type of the store
      * @param <V>   value type of the store
-     * @return  {@link SessionStoreType}
+     * @return  {@link QueryableStoreTypes.SessionStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> 
sessionStore() {
         return new SessionStoreType<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c0beb9e..05ebd33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -190,7 +190,7 @@ public class Stores {
      *
      * @param name the name of the store
      * @return the factory that can be used to specify other options or 
configurations for the store; never null
-     * @deprected use {@link #persistentKeyValueStore(String)}, {@link 
#persistentWindowStore(String, long, int, long, boolean)}
+     * @deprecated use {@link #persistentKeyValueStore(String)}, {@link 
#persistentWindowStore(String, long, int, long, boolean)}
      * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, 
int)}, or {@link #inMemoryKeyValueStore(String)}
      */
     @Deprecated

Reply via email to