Repository: kafka
Updated Branches:
  refs/heads/0.10.2 05b1ebd9e -> 903548f1a


http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
index 7b68137..a7bd889 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -31,84 +31,118 @@ import java.util.Map;
 public interface StreamsMetrics {
 
     /**
-     * Get read-only handle on global metrics registry
+     * Get read-only handle on global metrics registry.
+     *
      * @return Map of all metrics.
      */
     Map<MetricName, ? extends Metric> metrics();
 
     /**
-     * Add a latency and throughput sensor and default associated metrics. 
Metrics include both latency ones
-     * (average and max latency) and throughput ones (operations/time unit).
+     * Add a latency and throughput sensor for a specific operation, which 
will include the following sensors:
+     * <ol>
+     *   <li>average latency</li>
+     *   <li>max latency</li>
+     *   <li>throughput (num.operations / time unit)</li>
+     * </ol>
+     * Also create a parent sensor with the same metrics that aggregates all 
entities with the same operation under the
+     * same scope if it has not been created.
      *
-     * @param scopeName Name of the scope, could be the type of the state 
store, etc.
-     * @param entityName Name of the entity, could be the name of the state 
store instance, etc.
-     * @param recordingLevel The recording level (e.g., INFO or DEBUG) for 
this sensor.
-     * @param operationName Name of the operation, could be get / put / delete 
/ etc.
-     * @param tags Additional tags of the sensor.
+     * @param scopeName      name of the scope, could be the type of the state 
store, etc.
+     * @param entityName     name of the entity, could be the name of the 
state store instance, etc.
+     * @param operationName  name of the operation, could be get / put / 
delete / etc.
+     * @param recordingLevel the recording level (e.g., INFO or DEBUG) for 
this sensor.
+     * @param tags           additional tags of the sensor
      * @return The added sensor.
      */
-    Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, 
String operationName, Sensor.RecordingLevel recordingLevel, String... tags);
+    Sensor addLatencyAndThroughputSensor(final String scopeName,
+                                         final String entityName,
+                                         final String operationName,
+                                         final Sensor.RecordingLevel 
recordingLevel,
+                                         final String... tags);
 
     /**
-     * Record the given latency value of the sensor. If the passed sensor 
includes throughput metrics,
-     * e.g., when created by the {@link #addLatencyAndThroughputSensor(String, 
String, String, Sensor.RecordingLevel, String...)}
-     * method, then the throughput metrics will also be recorded from this 
event.
+     * Record the given latency value of the sensor.
+     * If the passed sensor includes throughput metrics, e.g., when created by 
the
+     * {@link #addLatencyAndThroughputSensor(String, String, String, 
Sensor.RecordingLevel, String...)} method, then the
+     * throughput metrics will also be recorded from this event.
      *
-     * @param sensor sensor whose latency we are recording.
+     * @param sensor  sensor whose latency we are recording.
      * @param startNs start of measurement time in nanoseconds.
-     * @param endNs end of measurement time in nanoseconds.
+     * @param endNs   end of measurement time in nanoseconds.
      */
-    void recordLatency(Sensor sensor, long startNs, long endNs);
+    void recordLatency(final Sensor sensor,
+                       final long startNs,
+                       final long endNs);
 
     /**
-     * Add a throughput sensor and default associated metrics. Metrics include 
throughput ones
-     * (operations/time unit). This sensor is a strict subset of the sensor 
created by
+     * Add a throughput sensor for a specific operation:
+     * <ol>
+     *   <li>throughput (num.operations / time unit)</li>
+     * </ol>
+     * Also create a parent sensor with the same metrics that aggregates all 
entities with the same operation under the
+     * same scope if it has not been created.
+     * This sensor is a strict subset of the sensors created by
      * {@link #addLatencyAndThroughputSensor(String, String, String, 
Sensor.RecordingLevel, String...)}.
      *
-     * @param scopeName Name of the scope, could be the type of the state 
store, etc.
-     * @param entityName Name of the entity, could be the name of the state 
store instance, etc.
-     * @param recordingLevel The recording level (e.g., INFO or DEBUG) for 
this sensor.
-     * @param operationName Name of the operation, could be get / put / delete 
/ etc.
-     * @param tags Additional tags of the sensor.
+     * @param scopeName      name of the scope, could be the type of the state 
store, etc.
+     * @param entityName     name of the entity, could be the name of the 
state store instance, etc.
+     * @param operationName  name of the operation, could be get / put / 
delete / etc.
+     * @param recordingLevel the recording level (e.g., INFO or DEBUG) for 
this sensor.
+     * @param tags           additional tags of the sensor
      * @return The added sensor.
      */
-    Sensor addThroughputSensor(String scopeName, String entityName, String 
operationName, Sensor.RecordingLevel recordingLevel, String... tags);
+    Sensor addThroughputSensor(final String scopeName,
+                               final String entityName,
+                               final String operationName,
+                               final Sensor.RecordingLevel recordingLevel,
+                               final String... tags);
 
     /**
-     * Records the throughput value of a sensor.
-     * @param sensor addSensor whose throughput we are recording.
-     * @param value throughput value.
+     * Record the throughput value of a sensor.
+     *
+     * @param sensor add Sensor whose throughput we are recording
+     * @param value  throughput value
      */
-    void recordThroughput(Sensor sensor, long value);
+    void recordThroughput(final Sensor sensor,
+                          final long value);
 
 
     /**
      * Generic method to create a sensor.
-     * Note that for most cases it is advisable to use {@link 
#addThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)}
+     * Note that for most cases it is advisable to use
+     * {@link #addThroughputSensor(String, String, String, 
Sensor.RecordingLevel, String...)}
      * or {@link #addLatencyAndThroughputSensor(String, String, String, 
Sensor.RecordingLevel, String...)} to ensure
-     * metric name well-formedness and conformity with the rest of the streams 
code base. However,
-     * if the above two methods are not sufficient, this method can also be 
used.
-     * @param name Name of the sensor.
-     * @param recordingLevel The recording level (e.g., INFO or DEBUG) for 
this sensor.
+     * metric name well-formedness and conformity with the rest of the streams 
code base.
+     * However, if the above two methods are not sufficient, this method can 
also be used.
+     *
+     * @param name           name of the sensor.
+     * @param recordingLevel the recording level (e.g., INFO or DEBUG) for 
this sensor
+     * @return The added sensor.
      */
-    Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel);
+    Sensor addSensor(final String name,
+                     final Sensor.RecordingLevel recordingLevel);
 
     /**
      * Generic method to create a sensor with parent sensors.
-     * Note that for most cases it is advisable to use {@link 
#addThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)}
+     * Note that for most cases it is advisable to use
+     * {@link #addThroughputSensor(String, String, String, 
Sensor.RecordingLevel, String...)}
      * or {@link #addLatencyAndThroughputSensor(String, String, String, 
Sensor.RecordingLevel, String...)} to ensure
-     * metric name well-formedness and conformity with the rest of the streams 
code base. However,
-     * if the above two methods are not sufficient, this method can also be 
used.
-     * @param name Name of the sensor.
-     * @param recordingLevel The recording level (e.g., INFO or DEBUG) for 
this sensor.
+     * metric name well-formedness and conformity with the rest of the streams 
code base.
+     * However, if the above two methods are not sufficient, this method can 
also be used.
+     *
+     * @param name           name of the sensor
+     * @param recordingLevel the recording level (e.g., INFO or DEBUG) for 
this sensor
+     * @return The added sensor.
      */
-    Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel, 
Sensor... parents);
+    Sensor addSensor(final String name,
+                     final Sensor.RecordingLevel recordingLevel,
+                     final Sensor... parents);
 
     /**
      * Remove a sensor.
-     * @param sensor Sensor to be removed.
+     * @param sensor sensor to be removed
      */
-    void removeSensor(Sensor sensor);
+    void removeSensor(final Sensor sensor);
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 6cfd202..f1f1338 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -353,6 +353,7 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
+     * String storeName = storeSupplier.name();
      * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long sumForKey = localStore.get(key); // key must be local (application 
state is shared over all running Kafka Streams instances)
@@ -554,9 +555,11 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}:
+     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
+     * Sting storeName = storeSupplier.name();
      * ReadOnlySessionStore<String,Long> sessionStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> sumForKeyForSession = 
localWindowStore.fetch(key); // key must be local (application state is shared 
over all running Kafka Streams instances)
@@ -666,6 +669,7 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
+     * Sting storeName = storeSupplier.name();
      * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long aggForKey = localStore.get(key); // key must be local (application 
state is shared over all running Kafka Streams instances)

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index aecd8ab..8f67d45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -19,15 +19,20 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
 import 
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 import java.util.Collections;
@@ -35,113 +40,139 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 /**
- * {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that 
provides the Kafka Streams DSL
- * for users to specify computational logic and translates the given logic to 
a {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}.
+ * {@link KStreamBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
+ *
+ * @see TopologyBuilder
+ * @see KStream
+ * @see KTable
+ * @see GlobalKTable
  */
 public class KStreamBuilder extends TopologyBuilder {
 
     private final AtomicInteger index = new AtomicInteger(0);
 
     /**
-     * Create a new {@link KStreamBuilder} instance.
-     */
-    public KStreamBuilder() {
-        super();
-    }
-
-    /**
-     * Create a {@link KStream} instance from the specified topics.
-     * The default deserializers specified in the config are used.
+     * Create a {@link KStream} from the specified topics.
+     * The default {@code "auto.offset.reset"} strategy and default key and 
value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are specified there are no ordering guaranteed for 
records from different topics.
      * <p>
-     * If multiple topics are specified there are nor ordering guaranteed for 
records from different topics.
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition 
the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param topics    the topic names; must contain at least one topic name
+     * @param topics the topic names; must contain at least one topic name
      * @return a {@link KStream} for the specified topics
      */
-    public <K, V> KStream<K, V> stream(String... topics) {
+    public <K, V> KStream<K, V> stream(final String... topics) {
         return stream(null, null, null, topics);
     }
 
-
     /**
-     * Create a {@link KStream} instance from the specified topics.
-     * The default deserializers specified in the config are used.
+     * Create a {@link KStream} from the specified topics.
+     * The default key and value deserializers as specified in the {@link 
StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are specified there are no ordering guaranteed for 
records from different topics.
      * <p>
-     * If multiple topics are specified there are nor ordering guaranteed for 
records from different topics.
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition 
the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
-     * @param topics    the topic names; must contain at least one topic name
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for 
the specified topics if no valid committed
+     *                    offsets are available
+     * @param topics      the topic names; must contain at least one topic name
      * @return a {@link KStream} for the specified topics
      */
-    public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, String... 
topics) {
+    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
+                                       final String... topics) {
         return stream(offsetReset, null, null, topics);
     }
 
-
     /**
-     * Create a {@link KStream} instance from the specified Pattern.
-     * The default deserializers specified in the config are used.
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default {@code "auto.offset.reset"} strategy and default key and 
value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created 
{@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different 
topics.
      * <p>
-     * If multiple topics are matched by the specified pattern, the created 
stream will read data from all of them,
-     * and there is no ordering guarantee between records from different topics
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition 
the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param topicPattern    the Pattern to match for topic names
+     * @param topicPattern the pattern to match for topic names
      * @return a {@link KStream} for topics matching the regex pattern.
      */
-    public <K, V> KStream<K, V> stream(Pattern topicPattern) {
+    public <K, V> KStream<K, V> stream(final Pattern topicPattern) {
         return stream(null, null, null, topicPattern);
     }
 
     /**
-     * Create a {@link KStream} instance from the specified Pattern.
-     * The default deserializers specified in the config are used.
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default key and value deserializers as specified in the {@link 
StreamsConfig config} are used.
      * <p>
-     * If multiple topics are matched by the specified pattern, the created 
stream will read data from all of them,
-     * and there is no ordering guarantee between records from different topics
+     * If multiple topics are matched by the specified pattern, the created 
{@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different 
topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition 
the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
-     * @param topicPattern    the Pattern to match for topic names
+     * @param offsetReset  the {@code "auto.offset.reset"} policy to use for 
the matched topics if no valid committed
+     *                     offsets are available
+     * @param topicPattern the pattern to match for topic names
      * @return a {@link KStream} for topics matching the regex pattern.
      */
-    public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Pattern 
topicPattern) {
+    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, 
final Pattern topicPattern) {
         return stream(offsetReset, null, null, topicPattern);
     }
 
-
-
     /**
-     * Create a {@link KStream} instance from the specified topics.
+     * Create a {@link KStream} from the specified topics.
+     * The default {@code "auto.offset.reset"} strategy as specified in the 
{@link StreamsConfig config} is used.
      * <p>
-     * If multiple topics are specified there are nor ordering guaranteed for 
records from different topics.
+     * If multiple topics are specified there are no ordering guaranteed for 
records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition 
the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param keySerde  key serde used to read this source {@link KStream},
-     *                  if not specified the default serde defined in the 
configs will be used
-     * @param valSerde  value serde used to read this source {@link KStream},
-     *                  if not specified the default serde defined in the 
configs will be used
-     * @param topics    the topic names; must contain at least one topic name
+     * @param keySerde key serde used to read this source {@link KStream},
+     *                 if not specified the default serde defined in the 
configs will be used
+     * @param valSerde value serde used to read this source {@link KStream},
+     *                 if not specified the default serde defined in the 
configs will be used
+     * @param topics   the topic names; must contain at least one topic name
      * @return a {@link KStream} for the specified topics
      */
-    public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, 
String... topics) {
+    public <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> 
valSerde, final String... topics) {
         return stream(null, keySerde, valSerde, topics);
     }
 
 
     /**
-     * Create a {@link KStream} instance from the specified topics.
+     * Create a {@link KStream} from the specified topics.
      * <p>
-     * If multiple topics are specified there are nor ordering guaranteed for 
records from different topics.
-     *
-     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
+     * If multiple topics are specified there are no ordering guaranteed for 
records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition 
the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param keySerde  key serde used to read this source {@link KStream},
-     *                  if not specified the default serde defined in the 
configs will be used
-     * @param valSerde  value serde used to read this source {@link KStream},
-     *                  if not specified the default serde defined in the 
configs will be used
-     * @param topics    the topic names; must contain at least one topic name
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for 
the specified topics if no valid committed
+     *                    offsets are available
+     * @param keySerde    key serde used to read this source {@link KStream},
+     *                    if not specified the default serde defined in the 
configs will be used
+     * @param valSerde    value serde used to read this source {@link KStream},
+     *                    if not specified the default serde defined in the 
configs will be used
+     * @param topics      the topic names; must contain at least one topic name
      * @return a {@link KStream} for the specified topics
      */
-    public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> 
keySerde, Serde<V> valSerde, String... topics) {
-        String name = newName(KStreamImpl.SOURCE_NAME);
+    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
+                                       final Serde<K> keySerde,
+                                       final Serde<V> valSerde,
+                                       final String... topics) {
+        final String name = newName(KStreamImpl.SOURCE_NAME);
 
         addSource(offsetReset, name,  keySerde == null ? null : 
keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), 
topics);
 
@@ -150,38 +181,53 @@ public class KStreamBuilder extends TopologyBuilder {
 
 
     /**
-     * Create a {@link KStream} instance from the specified Pattern.
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default {@code "auto.offset.reset"} strategy as specified in the 
{@link StreamsConfig config} is used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created 
{@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different 
topics.
      * <p>
-     * If multiple topics are matched by the specified pattern, the created 
stream will read data from all of them,
-     * and there is no ordering guarantee between records from different 
topics.
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition 
the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param keySerde  key serde used to read this source {@link KStream},
-     *                  if not specified the default serde defined in the 
configs will be used
-     * @param valSerde  value serde used to read this source {@link KStream},
-     *                  if not specified the default serde defined in the 
configs will be used
-     * @param topicPattern    the Pattern to match for topic names
-     * @return a {@link KStream} for the specified topics
+     * @param keySerde     key serde used to read this source {@link KStream},
+     *                     if not specified the default serde defined in the 
configs will be used
+     * @param valSerde     value serde used to read this source {@link 
KStream},
+     *                     if not specified the default serde defined in the 
configs will be used
+     * @param topicPattern the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
      */
-    public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, 
Pattern topicPattern) {
+    public <K, V> KStream<K, V> stream(final Serde<K> keySerde,
+                                       final Serde<V> valSerde,
+                                       final Pattern topicPattern) {
         return stream(null, keySerde, valSerde, topicPattern);
     }
 
     /**
-     * Create a {@link KStream} instance from the specified Pattern.
+     * Create a {@link KStream} from the specified topic pattern.
      * <p>
-     * If multiple topics are matched by the specified pattern, the created 
stream will read data from all of them,
-     * and there is no ordering guarantee between records from different 
topics.
+     * If multiple topics are matched by the specified pattern, the created 
{@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different 
topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition 
the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
-     * @param keySerde  key serde used to read this source {@link KStream},
-     *                  if not specified the default serde defined in the 
configs will be used
-     * @param valSerde  value serde used to read this source {@link KStream},
-     *                  if not specified the default serde defined in the 
configs will be used
-     * @param topicPattern    the Pattern to match for topic names
-     * @return a {@link KStream} for the specified topics
+     * @param offsetReset  the {@code "auto.offset.reset"} policy to use for 
the matched topics if no valid committed
+     *                     offsets are available
+     * @param keySerde     key serde used to read this source {@link KStream},
+     *                     if not specified the default serde defined in the 
configs will be used
+     * @param valSerde     value serde used to read this source {@link 
KStream},
+     *                     if not specified the default serde defined in the 
configs will be used
+     * @param topicPattern the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
      */
-    public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> 
keySerde, Serde<V> valSerde, Pattern topicPattern) {
-        String name = newName(KStreamImpl.SOURCE_NAME);
+    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
+                                       final Serde<K> keySerde,
+                                       final Serde<V> valSerde,
+                                       final Pattern topicPattern) {
+        final String name = newName(KStreamImpl.SOURCE_NAME);
 
         addSource(offsetReset, name, keySerde == null ? null : 
keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), 
topicPattern);
 
@@ -189,71 +235,152 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Create a {@link KTable} instance for the specified topic.
-     * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
-     * The default deserializers specified in the config are used.
-     * The resulting {@link KTable} will be materialized in a local state 
store with the given store name.
-     * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default key and 
value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link 
KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original 
input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return 
a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local 
(application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using 
{@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka 
Streams application.
      *
-     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
-     * @param topic     the topic name; cannot be null
-     * @param storeName the state store name used if this KTable is 
materialized, can be null if materialization not expected
-     * @return a {@link KTable} for the specified topics
+     * @param topic     the topic name; cannot be {@code null}
+     * @param storeName the state store name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
      */
-    public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, String 
topic, final String storeName) {
-        return table(offsetReset, null, null, topic, storeName);
+    public <K, V> KTable<K, V> table(final String topic,
+                                     final String storeName) {
+        return table(null, null, null, topic, storeName);
     }
 
     /**
-     * Create a {@link KTable} instance for the specified topic.
-     * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
-     * The default deserializers specified in the config are used.
-     * The resulting {@link KTable} will be materialized in a local state 
store with the given store name.
-     * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
+     * Create a {@link KTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link 
StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link 
KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original 
input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return 
a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local 
(application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using 
{@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka 
Streams application.
      *
-     * @param topic     the topic name; cannot be null
-     * @param storeName the state store name used if this KTable is 
materialized, can be null if materialization not expected
-     * @return a {@link KTable} for the specified topics
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for 
the specified topic if no valid committed
+     *                    offsets are available
+     * @param topic       the topic name; cannot be {@code null}
+     * @param storeName   the state store name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
      */
-    public <K, V> KTable<K, V> table(String topic, final String storeName) {
-        return table(null, null, null, topic, storeName);
+    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final String topic,
+                                     final String storeName) {
+        return table(offsetReset, null, null, topic, storeName);
     }
 
-
     /**
-     * Create a {@link KTable} instance for the specified topic.
-     * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
-     * The resulting {@link KTable} will be materialized in a local state 
store with the given store name.
-     * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy as specified in the 
{@link StreamsConfig config} is used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link 
KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original 
input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return 
a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local 
(application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using 
{@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka 
Streams application.
      *
-     * @param keySerde   key serde used to send key-value pairs,
-     *                   if not specified the default key serde defined in the 
configuration will be used
-     * @param valSerde   value serde used to send key-value pairs,
-     *                   if not specified the default value serde defined in 
the configuration will be used
-     * @param topic      the topic name; cannot be null
-     * @param storeName  the state store name used for the materialized KTable
-     * @return a {@link KTable} for the specified topics
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the 
configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in 
the configuration will be used
+     * @param topic     the topic name; cannot be {@code null}
+     * @param storeName the state store name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
      */
-    public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, 
String topic, final String storeName) {
+    public <K, V> KTable<K, V> table(final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic,
+                                     final String storeName) {
         return table(null, keySerde, valSerde, topic, storeName);
     }
 
     /**
-     * Create a {@link KTable} instance for the specified topic.
-     * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
-     * The resulting {@link KTable} will be materialized in a local state 
store with the given store name.
-     * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
+     * Create a {@link KTable} for the specified topic.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link 
KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original 
input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return 
a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local 
(application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using 
{@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka 
Streams application.
      *
-     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
-     * @param keySerde   key serde used to send key-value pairs,
-     *                   if not specified the default key serde defined in the 
configuration will be used
-     * @param valSerde   value serde used to send key-value pairs,
-     *                   if not specified the default value serde defined in 
the configuration will be used
-     * @param topic      the topic name; cannot be null
-     * @param storeName  the state store name used if this KTable is 
materialized, can be null if materialization not expected
-     * @return a {@link KTable} for the specified topics
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for 
the specified topic if no valid committed
+     *                    offsets are available
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in 
the configuration will be used
+     * @param valSerde    value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in 
the configuration will be used
+     * @param topic       the topic name; cannot be {@code null}
+     * @param storeName   the state store name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
      */
-    public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, Serde<K> 
keySerde, Serde<V> valSerde, String topic, final String storeName) {
+    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic,
+                                     final String storeName) {
         final String source = newName(KStreamImpl.SOURCE_NAME);
         final String name = newName(KTableImpl.SOURCE_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new 
KTableSource<>(storeName);
@@ -265,7 +392,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
         // only materialize the KTable into a state store if the storeName is 
not null
         if (storeName != null) {
-            StateStoreSupplier storeSupplier = new 
RocksDBKeyValueStoreSupplier<>(storeName,
+            final StateStoreSupplier storeSupplier = new 
RocksDBKeyValueStoreSupplier<>(storeName,
                     keySerde,
                     valSerde,
                     false,
@@ -279,22 +406,70 @@ public class KStreamBuilder extends TopologyBuilder {
         return kTable;
     }
 
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link 
StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs cannot have a {@code null} key (otherwise 
an exception will be thrown).
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local 
{@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original 
input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return 
a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code 
"auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param topic     the topic name; cannot be {@code null}
+     * @param storeName the state store name; cannot be {@code null}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                 final String storeName) {
+        return globalTable(null, null, topic, storeName);
+    }
 
     /**
-     * Create a new  {@link GlobalKTable} instance for the specified topic.
-     * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
-     * The resulting {@link GlobalKTable} will be materialized in a local 
state store with the given store name.
-     * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
-     * @param keySerde   key serde used to send key-value pairs,
-     *                   if not specified the default key serde defined in the 
configuration will be used
-     * @param valSerde   value serde used to send key-value pairs,
-     *                   if not specified the default value serde defined in 
the configuration will be used
-     * @param topic      the topic name; cannot be null
-     * @param storeName  the state store name used
-     * @return a {@link GlobalKTable} for the specified topics
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link 
StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs cannot have a {@code null} key (otherwise 
an exception will be thrown).
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local 
{@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original 
input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return 
a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code 
"auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the 
configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in 
the configuration will be used
+     * @param topic     the topic name; cannot be {@code null}
+     * @param storeName the state store name; cannot be {@code null}
+     * @return a {@link GlobalKTable} for the specified topic
      */
     @SuppressWarnings("unchecked")
-    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, 
final Serde<V> valSerde, final String topic, final String storeName) {
+    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                 final Serde<V> valSerde,
+                                                 final String topic,
+                                                 final String storeName) {
         final String sourceName = newName(KStreamImpl.SOURCE_NAME);
         final String processorName = newName(KTableImpl.SOURCE_NAME);
         final KTableSource<K, V> tableSource = new KTableSource<>(storeName);
@@ -315,41 +490,27 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Create a new  {@link GlobalKTable} instance for the specified topic 
using the default key and value {@link Serde}s
-     * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
-     * The resulting {@link GlobalKTable} will be materialized in a local 
state store with the given store name.
-     * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
-     *
-     * @param topic      the topic name; cannot be null
-     * @param storeName  the state store name used if this KTable is 
materialized, can be null if materialization not expected
-     * @return a {@link GlobalKTable} for the specified topics
-     */
-    public <K, V> GlobalKTable<K, V> globalTable(final String topic, final 
String storeName) {
-        return globalTable(null, null, topic, storeName);
-    }
-
-    /**
-     * Create a new instance of {@link KStream} by merging the given streams.
+     * Create a new instance of {@link KStream} by merging the given {@link 
KStream}s.
      * <p>
-     * There are nor ordering guaranteed for records from different streams.
+     * There are nor ordering guaranteed for records from different {@link 
KStream}s.
      *
-     * @param streams   the instances of {@link KStream} to be merged
+     * @param streams the {@link KStream}s to be merged
      * @return a {@link KStream} containing all records of the given streams
      */
-    public <K, V> KStream<K, V> merge(KStream<K, V>... streams) {
+    public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
         return KStreamImpl.merge(this, streams);
     }
 
     /**
+     * <strong>This function is only for internal usage only and should not be 
called.</strong>
+     * <p>
      * Create a unique processor name used for translation into the processor 
topology.
-     * This function is only for internal usage.
      *
-     * @param prefix    processor name prefix
+     * @param prefix processor name prefix
      * @return a new unique name
      */
-    public String newName(String prefix) {
+    public String newName(final String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
index 6df9481..e00a102 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
  *
  * @see FailOnInvalidTimestamp
  * @see LogAndSkipOnInvalidTimestamp
+ * @see UsePreviousTimeOnInvalidTimestamp
  */
 public class WallclockTimestampExtractor implements TimestampExtractor {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 5edc10d..8a9b255 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -135,7 +135,7 @@ public class StreamsKafkaClient {
     }
 
     /**
-     * Creates a set of new topics using batch request.
+     * Create a set of new topics using batch request.
      */
     public void createTopics(final Map<InternalTopicConfig, Integer> 
topicsMap, final int replicationFactor, final long 
windowChangeLogAdditionalRetention) {
 
@@ -253,7 +253,15 @@ public class StreamsKafkaClient {
         return metadataResponse.topicMetadata();
     }
 
-    public void checkBrokerCompatibility() {
+    /**
+     * Check if the used brokers have version 0.10.1.x or higher.
+     * <p>
+     * Note, for <em>pre</em> 0.10.x brokers the broker version cannot be 
checked and the client will hang and retry
+     * until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
+     *
+     * @throws StreamsException if brokers have version 0.10.0.x
+     */
+    public void checkBrokerCompatibility() throws StreamsException {
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
             getBrokerId(),
             new ApiVersionsRequest.Builder(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 6ac87ae..9e59113 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -90,7 +90,6 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
         streamsConfiguration
                 .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index b443abc..ac8abd4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -121,7 +121,6 @@ public class JoinIntegrationTest {
         
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        STREAMS_CONFIG.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
         STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());

http://git-wip-us.apache.org/repos/asf/kafka/blob/903548f1/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index eeb9177..7061507 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -133,7 +133,6 @@ public class KTableKTableJoinIntegrationTest {
 
         streamsConfig = new Properties();
         streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
         streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
         streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Reply via email to