Repository: kafka
Updated Branches:
  refs/heads/trunk 84d2b6a01 -> fe4a469fc


KAFKA-4830; Augment KStream.print() to allow users pass in extra parameters in 
the printed string

I extend `KStream#print()` to `KStream#print(KeyValueMapper<K, V, String>)`.
So I add the following methods :
1. `KStream#print(KeyValueMapper<K, V, String>)`
2. `KStream#print(KeyValueMapper<K, V, String>, String streamName)`
3. `KStream#print(KeyValueMapper<K, V, String>, Serde<K>, Serde<V>)`
4. `KStream#print(KeyValueMapper<K, V, String>, Serde<K>, Serde<V>, String 
streamName)`

Author: jameschien <jamesch...@staff.ruten.com.tw>
Author: jedichien <james.chain1...@gmail.com>
Author: JamesChien <jedich...@users.noreply.github.com>
Author: JamesChien <james.chain1...@gmail.com>

Reviewers: Bill Bejeck <bbe...@gmail.com>, Damian Guy <damian....@gmail.com>, 
Guozhang Wang <wangg...@gmail.com>

Closes #3085 from jedichien/KAFKA-4830


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe4a469f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe4a469f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe4a469f

Branch: refs/heads/trunk
Commit: fe4a469fce5a47b89083781ed8a03d0d71428aeb
Parents: 84d2b6a
Author: jameschien <jamesch...@staff.ruten.com.tw>
Authored: Thu Jul 20 14:47:38 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Thu Jul 20 14:47:38 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 228 ++++++++++++++++++-
 .../apache/kafka/streams/kstream/KTable.java    |  16 +-
 .../streams/kstream/PrintForeachAction.java     |  16 +-
 .../streams/kstream/internals/KStreamImpl.java  |  93 +++++---
 .../streams/kstream/internals/KTableImpl.java   |  42 ++--
 .../kstream/internals/KStreamPrintTest.java     |  46 +++-
 6 files changed, 374 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 9637927..191931b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -286,9 +286,9 @@ public interface KStream<K, V> {
      * Implementors will need to override {@code toString()} for keys and 
values that are not of type {@link String},
      * {@link Integer} etc. to get meaningful information.
      *
-     * @param streamName the name used to label the key/value pairs printed to 
the console
+     * @param label the name used to label the key/value pairs printed to the 
console
      */
-    void print(final String streamName);
+    void print(final String label);
 
     /**
      * Print the records of this stream to {@code System.out}.
@@ -318,11 +318,111 @@ public interface KStream<K, V> {
      *
      * @param keySerde   key serde used to deserialize key if type is {@code 
byte[]},
      * @param valSerde   value serde used to deserialize value if type is 
{@code byte[]},
-     * @param streamName the name used to label the key/value pairs printed to 
the console
+     * @param label the name used to label the key/value pairs printed to the 
console
      */
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde,
-               final String streamName);
+               final String label);
+
+    /**
+     * Print the customized output with {@code System.out}.
+     * <p>
+     * The default serde will be use to deserialize key or value if type is 
{@code byte[]}.
+     * The user provided {@link KeyValueMapper} which customizes output is 
used to print with {@code System.out}
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code 
String}.
+     */
+    void print(final KeyValueMapper<? super K, ? super V, String> mapper);
+
+    /**
+     * Print the customized output with {@code System.out}.
+     * <p>
+     * The default serde will be used to deserialize key or value if type is 
{@code byte[]}.
+     * The user provided {@link KeyValueMapper} which customizes output is 
used to print with {@code System.out}
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code 
String}.
+     * @param label The given name which labels output will be printed.
+     */
+    void print(final KeyValueMapper<? super K, ? super V, String> mapper, 
final String label);
+
+    /**
+     * Print the customized output with {@code System.out}.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is 
used to print with {@code System.out}
+     * The provided serde will be use to deserialize key or value if type is 
{@code byte[]}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The provided KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and 
values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code 
String}.
+     * @param keySerde a {@link Serde<K>} used to deserialize key if type is 
{@code byte[]}.
+     * @param valSerde a {@link Serde<V>} used to deserialize value if type is 
{@code byte[]}.
+     */
+    void print(final KeyValueMapper<? super K, ? super V, String> mapper, 
final Serde<K> keySerde, final Serde<V> valSerde);
+
+    /**
+     * Print the customized output with {@code System.out}.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is 
used to print with {@code System.out}.
+     * The provided serde will be use to deserialize key or value if type is 
{@code byte[]}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The provided KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and 
values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code 
String}.
+     * @param keySerde a {@link Serde<K>} used to deserialize key if type is 
{@code byte[]}.
+     * @param valSerde a {@link Serde<V>} used to deserialize value if type is 
{@code byte[]}.
+     * @param label The given name which labels output will be printed.
+     */
+    void print(final KeyValueMapper<? super K, ? super V, String> mapper, 
final Serde<K> keySerde, final Serde<V> valSerde, final String label);
 
     /**
      * Write the records of this stream to a file at the given path.
@@ -350,10 +450,10 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param filePath   name of the file to write to
-     * @param streamName the name used to label the key/value pairs written to 
the file
+     * @param label the name used to label the key/value pairs written to the 
file
      */
     void writeAsText(final String filePath,
-                     final String streamName);
+                     final String label);
 
     /**
      * Write the records of this stream to a file at the given path.
@@ -385,16 +485,128 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param filePath   name of the file to write to
-     * @param streamName the name used to label the key/value pairs written to 
the file
+     * @param label the name used to label the key/value pairs written to the 
file
      * @param keySerde   key serde used to deserialize key if type is {@code 
byte[]},
      * @param valSerde   value serde used deserialize value if type is {@code 
byte[]},
      */
     void writeAsText(final String filePath,
-                     final String streamName,
+                     final String label,
                      final Serde<K> keySerde,
                      final Serde<V> valSerde);
 
     /**
+     * Write the customised output to a given file path.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is 
used to write to file.
+     * This function will use default name of stream to label records.
+     * <p>
+     * The default key and value serde will used to deserialize {@code byte[]} 
records before calling {@code toString()}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     *
+     * @param filePath path of the file to write to.
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code 
String}.
+     */
+    void writeAsText(final String filePath, final KeyValueMapper<? super K, ? 
super V, String> mapper);
+
+    /**
+     * Write the customised output to a given file path.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is 
used to write to file.
+     * This function will use given name of stream to label records.
+     * <p>
+     * The default key and value serde will used to deserialize {@code byte[]} 
records before calling {@code toString()}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     *
+     * @param filePath path of the file to write to.
+     * @param label the name used to label records written to file.
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code 
String}.
+     */
+    void writeAsText(final String filePath, final String label, final 
KeyValueMapper<? super K, ? super V, String> mapper);
+
+    /**
+     * Write the customised output to a given file path.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is 
used to write to file.
+     * This function will use default name of stream to label records.
+     * <p>
+     * The given key and value serde will be used to deserialize {@code 
byte[]} records before calling {@code toString()}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and 
values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param filePath path of the file to write to.
+     * @param keySerde key serde used to deserialize key if type is {@code 
byte[]}.
+     * @param valSerde value serde used to deserialize value if type is {@code 
byte[]}.
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code 
String}.
+     */
+    void writeAsText(final String filePath, final Serde<K> keySerde, final 
Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
+
+    /**
+     * Write the customised output to a given file path.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is 
used to write to file.
+     * This function will use given name of stream to label records.
+     * <p>
+     * The given key and value serde will be used to deserialize {@code 
byte[]} records before calling {@code toString()}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and 
values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param filePath path of the file to write to.
+     * @param label the name used to label records written to file.
+     * @param keySerde key serde used to deserialize key if type is {@code 
byte[]}.
+     * @param valSerde value serde used to deserialize value if type is {@code 
byte[]}.
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code 
String}.
+     */
+    void writeAsText(final String filePath, final String label, final Serde<K> 
keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, 
String> mapper);
+
+    /**
      * Perform an action on each record of {@code KStream}.
      * This is a stateless record-by-record operation (cf. {@link 
#process(ProcessorSupplier, String...)}).
      * Note that this is a terminal operation that returns void.

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 7f8ab6a..b7dd43e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -415,13 +415,13 @@ public interface KTable<K, V> {
      * Note that {@code print()} is not applied to the internal state store 
and only called for each new {@code KTable}
      * update record.
      *
-     * @param streamName the name used to label the key/value pairs printed to 
the console
+     * @param label the name used to label the key/value pairs printed to the 
console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
      * convert to a KStream using {@code toStream()} and then use {@link 
KStream#print(String)} on the result.
      */
     @Deprecated
-    void print(final String streamName);
+    void print(final String label);
 
     /**
      * Print the update records of this {@code KTable} to {@code System.out}.
@@ -462,7 +462,7 @@ public interface KTable<K, V> {
      *
      * @param keySerde   key serde used to deserialize key if type is {@code 
byte[]},
      * @param valSerde   value serde used to deserialize value if type is 
{@code byte[]},
-     * @param streamName the name used to label the key/value pairs printed to 
the console
+     * @param label the name used to label the key/value pairs printed to the 
console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
      * convert to a KStream using {@code toStream()} and then use {@link 
KStream#print(Serde, Serde, String)} on the result.
@@ -470,7 +470,7 @@ public interface KTable<K, V> {
     @Deprecated
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde,
-               final String streamName);
+               final String label);
 
     /**
      * Write the update records of this {@code KTable} to a file at the given 
path.
@@ -508,14 +508,14 @@ public interface KTable<K, V> {
      * {@code KTable} update record.
      *
      * @param filePath   name of file to write to
-     * @param streamName the name used to label the key/value pairs printed 
out to the console
+     * @param label the name used to label the key/value pairs printed out to 
the console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the 
keys of a KTable. Alternatively
      * convert to a KStream using {@code toStream()} and then use {@link 
KStream#writeAsText(String, String)}} on the result.
      */
     @Deprecated
     void writeAsText(final String filePath,
-                     final String streamName);
+                     final String label);
 
     /**
      * Write the update records of this {@code KTable} to a file at the given 
path.
@@ -557,7 +557,7 @@ public interface KTable<K, V> {
      * {@code KTable} update record.
      *
      * @param filePath name of file to write to
-     * @param streamName the name used to label the key/value pairs printed to 
the console
+     * @param label the name used to label the key/value pairs printed to the 
console
      * @param keySerde key serde used to deserialize key if type is {@code 
byte[]},
      * @param valSerde value serde used to deserialize value if type is {@code 
byte[]}
      * @deprecated Use the Interactive Queries APIs (e.g., {@link 
KafkaStreams#store(String, QueryableStoreType) }
@@ -567,7 +567,7 @@ public interface KTable<K, V> {
      */
     @Deprecated
     void writeAsText(final String filePath,
-                     final String streamName,
+                     final String label,
                      final Serde<K> keySerde,
                      final Serde<V> valSerde);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
index 3eb6d80..5e8ec28 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
@@ -20,27 +20,29 @@ import java.io.PrintWriter;
 
 public class PrintForeachAction<K, V> implements ForeachAction<K, V> {
 
-    private final String streamName;
+    private final String label;
     private final PrintWriter printWriter;
-    
+    private final KeyValueMapper<? super K, ? super V, String> mapper;
     /**
-     * Print data message with given writer. The PrintWriter can be null in 
order to
+     * Print customized output with given writer. The PrintWriter can be null 
in order to
      * distinguish between {@code System.out} and the others. If the 
PrintWriter is {@code PrintWriter(System.out)},
      * then it would close {@code System.out} output stream.
      * <p>
      * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code 
null} instead.
      *
      * @param printWriter Use {@code System.out.println} if {@code null}.
-     * @param streamName The given name will be printed.
+     * @param mapper The mapper which can allow user to customize output will 
be printed.
+     * @param label The given name will be printed.
      */
-    public PrintForeachAction(final PrintWriter printWriter, final String 
streamName) {
+    public PrintForeachAction(final PrintWriter printWriter, final 
KeyValueMapper<? super K, ? super V, String> mapper, final String label) {
         this.printWriter = printWriter;
-        this.streamName = streamName;
+        this.mapper = mapper;
+        this.label = label;
     }
 
     @Override
     public void apply(final K key, final V value) {
-        final String data = String.format("[%s]: %s, %s", streamName, key, 
value);
+        final String data = String.format("[%s]: %s", label, mapper.apply(key, 
value));
         if (printWriter == null) {
             System.out.println(data);
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 9cf8b38..ba537c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -103,6 +103,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
     public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
 
+    private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
 
     private final boolean repartitionRequired;
 
@@ -110,6 +111,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
                        boolean repartitionRequired) {
         super(topology, name, sourceNodes);
         this.repartitionRequired = repartitionRequired;
+        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
+            @Override
+            public String apply(K key, V value) {
+                return String.format("%s, %s", key, value);
+            }
+        };
     }
 
     @Override
@@ -178,63 +185,98 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
     @Override
     public void print() {
-        print(null, null, null);
+        print(defaultKeyValueMapper, null, null, this.name);
+    }
+
+    @Override
+    public void print(final String label) {
+        print(defaultKeyValueMapper, null, null, label);
     }
 
     @Override
-    public void print(String streamName) {
-        print(null, null, streamName);
+    public void print(final Serde<K> keySerde, final Serde<V> valSerde) {
+        print(defaultKeyValueMapper, keySerde, valSerde, this.name);
     }
 
     @Override
-    public void print(Serde<K> keySerde, Serde<V> valSerde) {
-        print(keySerde, valSerde, null);
+    public void print(final Serde<K> keySerde, final Serde<V> valSerde, final 
String label) {
+        print(defaultKeyValueMapper, keySerde, valSerde, label);
     }
 
     @Override
-    public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) 
{
+    public void print(final KeyValueMapper<? super K, ? super V, String> 
mapper) {
+        print(mapper, null, null, this.name);
+    }
+
+    @Override
+    public void print(final KeyValueMapper<? super K, ? super V, String> 
mapper, final String label) {
+        print(mapper, null, null, label);
+    }
+
+    @Override
+    public void print(final KeyValueMapper<? super K, ? super V, String> 
mapper, final Serde<K> keySerde, final Serde<V> valSerde) {
+        print(mapper, keySerde, valSerde, this.name);
+    }
+
+    @Override
+    public void print(KeyValueMapper<? super K, ? super V, String> mapper, 
final Serde<K> keySerde, Serde<V> valSerde, final String label) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(label, "label can't be null");
         String name = topology.newName(PRINTING_NAME);
-        streamName = (streamName == null) ? this.name : streamName;
-        topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(null, streamName), keySerde, valSerde), this.name);
+        topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction<>(null, mapper, label), keySerde, valSerde), this.name);
     }
 
+    @Override
+    public void writeAsText(final String filePath) {
+        writeAsText(filePath, this.name, null, null, defaultKeyValueMapper);
+    }
 
     @Override
-    public void writeAsText(String filePath) {
-        writeAsText(filePath, null, null, null);
+    public void writeAsText(final String filePath, final String label) {
+        writeAsText(filePath, label, null, null, defaultKeyValueMapper);
     }
 
     @Override
-    public void writeAsText(String filePath, String streamName) {
-        writeAsText(filePath, streamName, null, null);
+    public void writeAsText(final String filePath, final Serde<K> keySerde, 
final Serde<V> valSerde) {
+        writeAsText(filePath, this.name, keySerde, valSerde, 
defaultKeyValueMapper);
     }
 
+    @Override
+    public void writeAsText(final String filePath, final String label, final 
Serde<K> keySerde, final Serde<V> valSerde) {
+        writeAsText(filePath, label, keySerde, valSerde, 
defaultKeyValueMapper);
+    }
 
     @Override
-    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> 
valSerde) {
-        writeAsText(filePath, null, keySerde, valSerde);
+    public void writeAsText(final String filePath, final KeyValueMapper<? 
super K, ? super V, String> mapper) {
+        writeAsText(filePath, this.name, null, null, mapper);
+    }
+
+    @Override
+    public void writeAsText(final String filePath, final String label, final 
KeyValueMapper<? super K, ? super V, String> mapper) {
+        writeAsText(filePath, label, null, null, mapper);
+    }
+
+    @Override
+    public void writeAsText(final String filePath, final Serde<K> keySerde, 
final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> 
mapper) {
+        writeAsText(filePath, this.name, keySerde, valSerde, mapper);
     }
 
-    /**
-     * @throws TopologyBuilderException if file is not found
-     */
     @Override
-    public void writeAsText(String filePath, String streamName, Serde<K> 
keySerde, Serde<V> valSerde) {
+    public void writeAsText(final String filePath, final String label, final 
Serde<K> keySerde, final Serde<V> valSerde, KeyValueMapper<? super K, ? super 
V, String> mapper) {
         Objects.requireNonNull(filePath, "filePath can't be null");
+        Objects.requireNonNull(label, "label can't be null");
+        Objects.requireNonNull(mapper, "mapper can't be null");
         if (filePath.trim().isEmpty()) {
             throw new TopologyBuilderException("filePath can't be an empty 
string");
         }
-        String name = topology.newName(PRINTING_NAME);
-        streamName = (streamName == null) ? this.name : streamName;
+        final String name = topology.newName(PRINTING_NAME);
         try {
-            PrintWriter printWriter = null;
-            printWriter = new PrintWriter(filePath, 
StandardCharsets.UTF_8.name());
-            topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(printWriter, streamName), keySerde, valSerde), this.name);
+            PrintWriter printWriter = new PrintWriter(filePath, 
StandardCharsets.UTF_8.name());
+            topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction<>(printWriter, mapper, label), keySerde, valSerde), 
this.name);
         } catch (FileNotFoundException | UnsupportedEncodingException e) {
             String message = "Unable to write stream to file at [" + filePath 
+ "] " + e.getMessage();
             throw new TopologyBuilderException(message);
         }
-
     }
 
     @Override
@@ -678,9 +720,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
                                         this.repartitionRequired);
     }
 
-
-
-
     private static <K, V> StateStoreSupplier createWindowedStateStore(final 
JoinWindows windows,
                                                                      final 
Serde<K> keySerde,
                                                                      final 
Serde<V> valueSerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 912f42c..679efe5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -75,6 +75,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
 
     private final ProcessorSupplier<?, ?> processorSupplier;
 
+    private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
+
     private final String queryableStoreName;
     private final boolean isQueryable;
 
@@ -94,6 +96,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         this.keySerde = null;
         this.valSerde = null;
         this.isQueryable = isQueryable;
+        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
+            @Override
+            public String apply(K key, V value) {
+                return String.format("%s, %s", key, value);
+            }
+        };
     }
 
     public KTableImpl(KStreamBuilder topology,
@@ -110,6 +118,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.isQueryable = isQueryable;
+        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
+            @Override
+            public String apply(K key, V value) {
+                return String.format("%s, %s", key, value);
+            }
+        };
     }
 
     @Override
@@ -226,56 +240,56 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K> implements KTable<K,
 
     @Override
     public void print() {
-        print(null, null, null);
+        print(null, null, this.name);
     }
 
     @Override
-    public void print(String streamName) {
-        print(null, null, streamName);
+    public void print(String label) {
+        print(null, null, label);
     }
 
     @Override
     public void print(Serde<K> keySerde, Serde<V> valSerde) {
-        print(keySerde, valSerde, null);
+        print(keySerde, valSerde, this.name);
     }
 
 
     @Override
-    public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) 
{
+    public void print(Serde<K> keySerde, final Serde<V> valSerde, String 
label) {
+        Objects.requireNonNull(label, "label can't be null");
         String name = topology.newName(PRINTING_NAME);
-        streamName = (streamName == null) ? this.name : streamName;
-        topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(null, streamName), keySerde, valSerde), this.name);
+        topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(null, defaultKeyValueMapper, label), keySerde, valSerde), 
this.name);
     }
 
     @Override
     public void writeAsText(String filePath) {
-        writeAsText(filePath, null, null, null);
+        writeAsText(filePath, this.name, null, null);
     }
 
     @Override
-    public void writeAsText(String filePath, String streamName) {
-        writeAsText(filePath, streamName, null, null);
+    public void writeAsText(String filePath, String label) {
+        writeAsText(filePath, label, null, null);
     }
 
     @Override
     public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> 
valSerde) {
-        writeAsText(filePath, null, keySerde, valSerde);
+        writeAsText(filePath, this.name, keySerde, valSerde);
     }
 
     /**
      * @throws TopologyBuilderException if file is not found
      */
     @Override
-    public void writeAsText(String filePath, String streamName, Serde<K> 
keySerde, Serde<V> valSerde) {
+    public void writeAsText(String filePath, String label, Serde<K> keySerde, 
Serde<V> valSerde) {
         Objects.requireNonNull(filePath, "filePath can't be null");
+        Objects.requireNonNull(label, "label can't be null");
         if (filePath.trim().isEmpty()) {
             throw new TopologyBuilderException("filePath can't be an empty 
string");
         }
         String name = topology.newName(PRINTING_NAME);
-        streamName = (streamName == null) ? this.name : streamName;
         try {
             PrintWriter printWriter = new PrintWriter(filePath, 
StandardCharsets.UTF_8.name());
-            topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(printWriter, streamName), keySerde, valSerde), this.name);
+            topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(printWriter, defaultKeyValueMapper, label), keySerde, 
valSerde), this.name);
         } catch (FileNotFoundException | UnsupportedEncodingException e) {
             String message = "Unable to write stream to file at [" + filePath 
+ "] " + e.getMessage();
             throw new TopologyBuilderException(message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index c537e0a..c94b868 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -63,7 +64,13 @@ public class KStreamPrintTest {
     
     @Test
     public void testPrintKeyValueWithName() {
-        final KStreamPrint<Integer, String> kStreamPrint = new 
KStreamPrint<>(new PrintForeachAction(printWriter, "test-stream"), intSerd, 
stringSerd);
+        KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+            @Override
+            public String apply(Integer key, String value) {
+                return String.format("%d, %s", key, value);
+            }
+        };
+        final KStreamPrint<Integer, String> kStreamPrint = new 
KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, "test-stream"), 
intSerd, stringSerd);
 
         final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
                 new KeyValue<>(0, "zero"),
@@ -82,9 +89,42 @@ public class KStreamPrintTest {
             driver.process(topicName, record.key, record.value);
         }
         printWriter.flush();
-        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), 
Charset.forName("UTF-8")).split("\n");
+        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), 
Charset.forName("UTF-8")).split("\\r*\\n");
         for (int i = 0; i < flushOutDatas.length; i++) {
-            assertEquals(flushOutDatas[i], expectedResult[i]);
+            assertEquals(expectedResult[i], flushOutDatas[i]);
+        }
+    }
+
+    @Test
+    public void testPrintStreamWithProvidedKeyValueMapper() {
+        final KeyValueMapper<Integer, String, String> mapper = new 
KeyValueMapper<Integer, String, String>() {
+            @Override
+            public String apply(Integer key, String value) {
+                return String.format("(%d, %s)", key, value);
+            }
+        };
+        final KStreamPrint<Integer, String> kStreamPrint = new 
KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, "test-stream"), 
intSerd, stringSerd);
+
+        final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+                new KeyValue<>(0, "zero"),
+                new KeyValue<>(1, "one"),
+                new KeyValue<>(2, "two"),
+                new KeyValue<>(3, "three"));
+
+        final String[] expectedResult = {"[test-stream]: (0, zero)", 
"[test-stream]: (1, one)", "[test-stream]: (2, two)", "[test-stream]: (3, 
three)"};
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<Integer, String> stream = builder.stream(intSerd, 
stringSerd, topicName);
+        stream.process(kStreamPrint);
+
+        driver = new KStreamTestDriver(builder);
+        for (KeyValue<Integer, String> record: inputRecords) {
+            driver.process(topicName, record.key, record.value);
+        }
+        printWriter.flush();
+        final String[] results = new String(byteOutStream.toByteArray(), 
Charset.forName("UTF-8")).split("\\r*\\n");
+        for (int i = 0; i < results.length; i++) {
+            assertEquals(expectedResult[i], results[i]);
         }
     }
 

Reply via email to