Repository: kafka
Updated Branches:
  refs/heads/trunk 5e8958a85 -> d07bb1814


MINOR: comments on KStream methods, and fix generics

guozhangwang

Author: Yasuhiro Matsuda <[email protected]>

Reviewers: Guozhang Wang

Closes #591 from ymatsuda/comments


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d07bb181
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d07bb181
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d07bb181

Branch: refs/heads/trunk
Commit: d07bb1814010ca4d822e44330d1e8ea4b2839c80
Parents: 5e8958a
Author: Yasuhiro Matsuda <[email protected]>
Authored: Wed Nov 25 16:44:43 2015 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Wed Nov 25 16:44:43 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 33 +++++++++-----------
 .../kafka/streams/kstream/KStreamBuilder.java   |  8 ++---
 .../streams/kstream/internals/KStreamImpl.java  | 21 +++++--------
 .../kstream/internals/KStreamImplTest.java      |  2 +-
 4 files changed, 25 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8f0794c..992bd75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 public interface KStream<K, V> {
 
     /**
-     * Creates a new stream consists of all elements of this stream which 
satisfy a predicate
+     * Creates a new instance of KStream consists of all elements of this 
stream which satisfy a predicate
      *
      * @param predicate the instance of Predicate
      * @return the stream with only those elements that satisfy the predicate
@@ -38,7 +38,7 @@ public interface KStream<K, V> {
     KStream<K, V> filter(Predicate<K, V> predicate);
 
     /**
-     * Creates a new stream consists all elements of this stream which do not 
satisfy a predicate
+     * Creates a new instance of KStream consists all elements of this stream 
which do not satisfy a predicate
      *
      * @param predicate the instance of Predicate
      * @return the stream with only those elements that do not satisfy the 
predicate
@@ -56,30 +56,30 @@ public interface KStream<K, V> {
     <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> 
mapper);
 
     /**
-     * Creates a new stream by applying transforming each value in this stream 
into a different value in the new stream.
+     * Creates a new instance of KStream by applying transforming each value 
in this stream into a different value in the new stream.
      *
      * @param mapper the instance of ValueMapper
      * @param <V1>   the value type of the new stream
-     * @return the mapped stream
+     * @return the instance of KStream
      */
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
-     * Creates a new stream by applying transforming each element in this 
stream into zero or more elements in the new stream.
+     * Creates a new instance of KStream by applying transforming each element 
in this stream into zero or more elements in the new stream.
      *
      * @param mapper the instance of KeyValueMapper
      * @param <K1>   the key type of the new stream
      * @param <V1>   the value type of the new stream
-     * @return the mapped stream
+     * @return the instance of KStream
      */
     <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, 
Iterable<KeyValue<K1, V1>>> mapper);
 
     /**
-     * Creates a new stream by applying transforming each value in this stream 
into zero or more values in the new stream.
+     * Creates a new instance of KStream by applying transforming each value 
in this stream into zero or more values in the new stream.
      *
      * @param processor the instance of Processor
      * @param <V1>      the value type of the new stream
-     * @return the mapped stream
+     * @return the instance of KStream
      */
     <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
 
@@ -98,7 +98,7 @@ public interface KStream<K, V> {
      * An element will be dropped if none of the predicates evaluate true.
      *
      * @param predicates the ordered list of Predicate instances
-     * @return the new streams that each contain those elements for which 
their Predicate evaluated to true.
+     * @return the instances of KStream that each contain those elements for 
which their Predicate evaluated to true.
      */
     KStream<K, V>[] branch(Predicate<K, V>... predicates);
 
@@ -107,14 +107,12 @@ public interface KStream<K, V> {
      * This is equivalent to calling to(topic) and from(topic).
      *
      * @param topic           the topic name
-     * @param <K1>            the key type of the new stream
-     * @param <V1>            the value type of the new stream
      * @return the new stream that consumes the given topic
      */
-    <K1, V1> KStream<K1, V1> through(String topic);
+    KStream<K, V> through(String topic);
 
     /**
-     * Sends key-value to a topic, also creates a new stream from the topic.
+     * Sends key-value to a topic, also creates a new instance of KStream from 
the topic.
      * This is equivalent to calling to(topic) and from(topic).
      *
      * @param topic           the topic name
@@ -126,11 +124,9 @@ public interface KStream<K, V> {
      *                        if not specified the default key deserializer 
defined in the configuration will be used
      * @param valDeserializer value deserializer used to create the new 
KStream,
      *                        if not specified the default value deserializer 
defined in the configuration will be used
-     * @param <K1>            the key type of the new stream
-     * @param <V1>            the value type of the new stream
      * @return the new stream that consumes the given topic
      */
-    <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> 
keySerializer, Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, 
Deserializer<V1> valDeserializer);
+    KStream<K, V> through(String topic, Serializer<K> keySerializer, 
Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> 
valDeserializer);
 
     /**
      * Sends key-value to a topic using default serializers specified in the 
config.
@@ -155,7 +151,7 @@ public interface KStream<K, V> {
      *
      * @param transformerSupplier the class of TransformerDef
      * @param stateStoreNames the names of the state store used by the 
processor
-     * @return KStream
+     * @return the instance of KStream that contains transformed keys and 
values
      */
     <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, 
V1>> transformerSupplier, String... stateStoreNames);
 
@@ -164,7 +160,7 @@ public interface KStream<K, V> {
      *
      * @param valueTransformerSupplier the class of TransformerDef
      * @param stateStoreNames the names of the state store used by the 
processor
-     * @return KStream
+     * @return the instance of KStream that contains transformed keys and 
values
      */
     <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> 
valueTransformerSupplier, String... stateStoreNames);
 
@@ -173,7 +169,6 @@ public interface KStream<K, V> {
      *
      * @param processorSupplier the supplier of the Processor to use
      * @param stateStoreNames the names of the state store used by the 
processor
-     * @return the new stream containing the processed output
      */
     void process(ProcessorSupplier<K, V> processorSupplier, String... 
stateStoreNames);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index a95d08c..ae8f694 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -43,11 +43,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return KStream
      */
     public <K, V> KStream<K, V> from(String... topics) {
-        String name = newName(KStreamImpl.SOURCE_NAME);
-
-        addSource(name, topics);
-
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
+        return from(null, null, topics);
     }
 
     /**
@@ -60,7 +56,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param topics          the topic names, if empty default to all the 
topics in the config
      * @return KStream
      */
-    public <K, V> KStream<K, V> from(Deserializer<? extends K> 
keyDeserializer, Deserializer<? extends V> valDeserializer, String... topics) {
+    public <K, V> KStream<K, V> from(Deserializer<K> keyDeserializer, 
Deserializer<V> valDeserializer, String... topics) {
         String name = newName(KStreamImpl.SOURCE_NAME);
 
         addSource(name, keyDeserializer, valDeserializer, topics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a408458..04aa8e9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.WindowSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 import java.lang.reflect.Array;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -187,25 +186,21 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> through(String topic,
-                                            Serializer<K> keySerializer,
-                                            Serializer<V> valSerializer,
-                                            Deserializer<K1> keyDeserializer,
-                                            Deserializer<V1> valDeserializer) {
+    public KStream<K, V> through(String topic,
+                                 Serializer<K> keySerializer,
+                                 Serializer<V> valSerializer,
+                                 Deserializer<K> keyDeserializer,
+                                 Deserializer<V> valDeserializer) {
         String sendName = topology.newName(SINK_NAME);
 
         topology.addSink(sendName, topic, keySerializer, valSerializer, 
this.name);
 
-        String sourceName = topology.newName(SOURCE_NAME);
-
-        topology.addSource(sourceName, keyDeserializer, valDeserializer, 
topic);
-
-        return new KStreamImpl<>(topology, sourceName, 
Collections.singleton(sourceName));
+        return topology.from(keyDeserializer, valDeserializer, topic);
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> through(String topic) {
-        return through(topic, (Serializer<K>) null, (Serializer<V>) null, 
(Deserializer<K1>) null, (Deserializer<V1>) null);
+    public KStream<K, V> through(String topic) {
+        return through(topic, null, null, null, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index d924a34..1e775b8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -119,7 +119,7 @@ public class KStreamImplTest {
 
         stream4.to("topic-5");
 
-        stream5.through("topic-6").process(new MockProcessorSupplier<>());
+        stream5.through("topic-6").process(new MockProcessorSupplier<String, 
Integer>());
 
         assertEquals(2 + // sources
             2 + // stream1

Reply via email to