Repository: kafka Updated Branches: refs/heads/trunk 84d2b6a01 -> fe4a469fc
KAFKA-4830; Augment KStream.print() to allow users pass in extra parameters in the printed string I extend `KStream#print()` to `KStream#print(KeyValueMapper<K, V, String>)`. So I add the following methods : 1. `KStream#print(KeyValueMapper<K, V, String>)` 2. `KStream#print(KeyValueMapper<K, V, String>, String streamName)` 3. `KStream#print(KeyValueMapper<K, V, String>, Serde<K>, Serde<V>)` 4. `KStream#print(KeyValueMapper<K, V, String>, Serde<K>, Serde<V>, String streamName)` Author: jameschien <jamesch...@staff.ruten.com.tw> Author: jedichien <james.chain1...@gmail.com> Author: JamesChien <jedich...@users.noreply.github.com> Author: JamesChien <james.chain1...@gmail.com> Reviewers: Bill Bejeck <bbe...@gmail.com>, Damian Guy <damian....@gmail.com>, Guozhang Wang <wangg...@gmail.com> Closes #3085 from jedichien/KAFKA-4830 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe4a469f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe4a469f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe4a469f Branch: refs/heads/trunk Commit: fe4a469fce5a47b89083781ed8a03d0d71428aeb Parents: 84d2b6a Author: jameschien <jamesch...@staff.ruten.com.tw> Authored: Thu Jul 20 14:47:38 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Thu Jul 20 14:47:38 2017 +0100 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 228 ++++++++++++++++++- .../apache/kafka/streams/kstream/KTable.java | 16 +- .../streams/kstream/PrintForeachAction.java | 16 +- .../streams/kstream/internals/KStreamImpl.java | 93 +++++--- .../streams/kstream/internals/KTableImpl.java | 42 ++-- .../kstream/internals/KStreamPrintTest.java | 46 +++- 6 files changed, 374 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 9637927..191931b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -286,9 +286,9 @@ public interface KStream<K, V> { * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String}, * {@link Integer} etc. to get meaningful information. * - * @param streamName the name used to label the key/value pairs printed to the console + * @param label the name used to label the key/value pairs printed to the console */ - void print(final String streamName); + void print(final String label); /** * Print the records of this stream to {@code System.out}. @@ -318,11 +318,111 @@ public interface KStream<K, V> { * * @param keySerde key serde used to deserialize key if type is {@code byte[]}, * @param valSerde value serde used to deserialize value if type is {@code byte[]}, - * @param streamName the name used to label the key/value pairs printed to the console + * @param label the name used to label the key/value pairs printed to the console */ void print(final Serde<K> keySerde, final Serde<V> valSerde, - final String streamName); + final String label); + + /** + * Print the customized output with {@code System.out}. + * <p> + * The default serde will be use to deserialize key or value if type is {@code byte[]}. + * The user provided {@link KeyValueMapper} which customizes output is used to print with {@code System.out} + * <p> + * The example below shows the way to customize output data. + * <pre>{@code + * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + * @Override + * public String apply(Integer key, String value) { + * return String.format("(%d, %s)", key, value); + * } + * }; + * }</pre> + * <p> + * The KeyValueMapper's mapped value type must be {@code String}. + * + * @param mapper a {@link KeyValueMapper} that computes output type {@code String}. + */ + void print(final KeyValueMapper<? super K, ? super V, String> mapper); + + /** + * Print the customized output with {@code System.out}. + * <p> + * The default serde will be used to deserialize key or value if type is {@code byte[]}. + * The user provided {@link KeyValueMapper} which customizes output is used to print with {@code System.out} + * <p> + * The example below shows the way to customize output data. + * <pre>{@code + * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + * @Override + * public String apply(Integer key, String value) { + * return String.format("(%d, %s)", key, value); + * } + * }; + * }</pre> + * <p> + * The KeyValueMapper's mapped value type must be {@code String}. + * + * @param mapper a {@link KeyValueMapper} that computes output type {@code String}. + * @param label The given name which labels output will be printed. + */ + void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String label); + + /** + * Print the customized output with {@code System.out}. + * <p> + * The user provided {@link KeyValueMapper} which customizes output is used to print with {@code System.out} + * The provided serde will be use to deserialize key or value if type is {@code byte[]}. + * <p> + * The example below shows the way to customize output data. + * <pre>{@code + * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + * @Override + * public String apply(Integer key, String value) { + * return String.format("(%d, %s)", key, value); + * } + * }; + * }</pre> + * <p> + * The provided KeyValueMapper's mapped value type must be {@code String}. + * <p> + * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String}, + * {@link Integer} etc. to get meaningful information. + * + * @param mapper a {@link KeyValueMapper} that computes output type {@code String}. + * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}. + * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}. + */ + void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde); + + /** + * Print the customized output with {@code System.out}. + * <p> + * The user provided {@link KeyValueMapper} which customizes output is used to print with {@code System.out}. + * The provided serde will be use to deserialize key or value if type is {@code byte[]}. + * <p> + * The example below shows the way to customize output data. + * <pre>{@code + * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + * @Override + * public String apply(Integer key, String value) { + * return String.format("(%d, %s)", key, value); + * } + * }; + * }</pre> + * <p> + * The provided KeyValueMapper's mapped value type must be {@code String}. + * <p> + * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String}, + * {@link Integer} etc. to get meaningful information. + * + * @param mapper a {@link KeyValueMapper} that computes output type {@code String}. + * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}. + * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}. + * @param label The given name which labels output will be printed. + */ + void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label); /** * Write the records of this stream to a file at the given path. @@ -350,10 +450,10 @@ public interface KStream<K, V> { * {@link Integer} etc. to get meaningful information. * * @param filePath name of the file to write to - * @param streamName the name used to label the key/value pairs written to the file + * @param label the name used to label the key/value pairs written to the file */ void writeAsText(final String filePath, - final String streamName); + final String label); /** * Write the records of this stream to a file at the given path. @@ -385,16 +485,128 @@ public interface KStream<K, V> { * {@link Integer} etc. to get meaningful information. * * @param filePath name of the file to write to - * @param streamName the name used to label the key/value pairs written to the file + * @param label the name used to label the key/value pairs written to the file * @param keySerde key serde used to deserialize key if type is {@code byte[]}, * @param valSerde value serde used deserialize value if type is {@code byte[]}, */ void writeAsText(final String filePath, - final String streamName, + final String label, final Serde<K> keySerde, final Serde<V> valSerde); /** + * Write the customised output to a given file path. + * <p> + * The user provided {@link KeyValueMapper} which customizes output is used to write to file. + * This function will use default name of stream to label records. + * <p> + * The default key and value serde will used to deserialize {@code byte[]} records before calling {@code toString()}. + * <p> + * The example below shows the way to customize output data. + * <pre>{@code + * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + * @Override + * public String apply(Integer key, String value) { + * return String.format("(%d, %s)", key, value); + * } + * }; + * }</pre> + * <p> + * The KeyValueMapper's mapped value type must be {@code String}. + * + * @param filePath path of the file to write to. + * @param mapper a {@link KeyValueMapper} that computes output type {@code String}. + */ + void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper); + + /** + * Write the customised output to a given file path. + * <p> + * The user provided {@link KeyValueMapper} which customizes output is used to write to file. + * This function will use given name of stream to label records. + * <p> + * The default key and value serde will used to deserialize {@code byte[]} records before calling {@code toString()}. + * <p> + * The example below shows the way to customize output data. + * <pre>{@code + * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + * @Override + * public String apply(Integer key, String value) { + * return String.format("(%d, %s)", key, value); + * } + * }; + * }</pre> + * <p> + * The KeyValueMapper's mapped value type must be {@code String}. + * + * @param filePath path of the file to write to. + * @param label the name used to label records written to file. + * @param mapper a {@link KeyValueMapper} that computes output type {@code String}. + */ + void writeAsText(final String filePath, final String label, final KeyValueMapper<? super K, ? super V, String> mapper); + + /** + * Write the customised output to a given file path. + * <p> + * The user provided {@link KeyValueMapper} which customizes output is used to write to file. + * This function will use default name of stream to label records. + * <p> + * The given key and value serde will be used to deserialize {@code byte[]} records before calling {@code toString()}. + * <p> + * The example below shows the way to customize output data. + * <pre>{@code + * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + * @Override + * public String apply(Integer key, String value) { + * return String.format("(%d, %s)", key, value); + * } + * }; + * }</pre> + * <p> + * The KeyValueMapper's mapped value type must be {@code String}. + * <p> + * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String}, + * {@link Integer} etc. to get meaningful information. + * + * @param filePath path of the file to write to. + * @param keySerde key serde used to deserialize key if type is {@code byte[]}. + * @param valSerde value serde used to deserialize value if type is {@code byte[]}. + * @param mapper a {@link KeyValueMapper} that computes output type {@code String}. + */ + void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper); + + /** + * Write the customised output to a given file path. + * <p> + * The user provided {@link KeyValueMapper} which customizes output is used to write to file. + * This function will use given name of stream to label records. + * <p> + * The given key and value serde will be used to deserialize {@code byte[]} records before calling {@code toString()}. + * <p> + * The example below shows the way to customize output data. + * <pre>{@code + * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + * @Override + * public String apply(Integer key, String value) { + * return String.format("(%d, %s)", key, value); + * } + * }; + * }</pre> + * <p> + * The KeyValueMapper's mapped value type must be {@code String}. + * <p> + * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String}, + * {@link Integer} etc. to get meaningful information. + * + * @param filePath path of the file to write to. + * @param label the name used to label records written to file. + * @param keySerde key serde used to deserialize key if type is {@code byte[]}. + * @param valSerde value serde used to deserialize value if type is {@code byte[]}. + * @param mapper a {@link KeyValueMapper} that computes output type {@code String}. + */ + void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper); + + /** * Perform an action on each record of {@code KStream}. * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}). * Note that this is a terminal operation that returns void. http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 7f8ab6a..b7dd43e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -415,13 +415,13 @@ public interface KTable<K, V> { * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable} * update record. * - * @param streamName the name used to label the key/value pairs printed to the console + * @param label the name used to label the key/value pairs printed to the console * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively * convert to a KStream using {@code toStream()} and then use {@link KStream#print(String)} on the result. */ @Deprecated - void print(final String streamName); + void print(final String label); /** * Print the update records of this {@code KTable} to {@code System.out}. @@ -462,7 +462,7 @@ public interface KTable<K, V> { * * @param keySerde key serde used to deserialize key if type is {@code byte[]}, * @param valSerde value serde used to deserialize value if type is {@code byte[]}, - * @param streamName the name used to label the key/value pairs printed to the console + * @param label the name used to label the key/value pairs printed to the console * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde, String)} on the result. @@ -470,7 +470,7 @@ public interface KTable<K, V> { @Deprecated void print(final Serde<K> keySerde, final Serde<V> valSerde, - final String streamName); + final String label); /** * Write the update records of this {@code KTable} to a file at the given path. @@ -508,14 +508,14 @@ public interface KTable<K, V> { * {@code KTable} update record. * * @param filePath name of file to write to - * @param streamName the name used to label the key/value pairs printed out to the console + * @param label the name used to label the key/value pairs printed out to the console * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String)}} on the result. */ @Deprecated void writeAsText(final String filePath, - final String streamName); + final String label); /** * Write the update records of this {@code KTable} to a file at the given path. @@ -557,7 +557,7 @@ public interface KTable<K, V> { * {@code KTable} update record. * * @param filePath name of file to write to - * @param streamName the name used to label the key/value pairs printed to the console + * @param label the name used to label the key/value pairs printed to the console * @param keySerde key serde used to deserialize key if type is {@code byte[]}, * @param valSerde value serde used to deserialize value if type is {@code byte[]} * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } @@ -567,7 +567,7 @@ public interface KTable<K, V> { */ @Deprecated void writeAsText(final String filePath, - final String streamName, + final String label, final Serde<K> keySerde, final Serde<V> valSerde); http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java index 3eb6d80..5e8ec28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java @@ -20,27 +20,29 @@ import java.io.PrintWriter; public class PrintForeachAction<K, V> implements ForeachAction<K, V> { - private final String streamName; + private final String label; private final PrintWriter printWriter; - + private final KeyValueMapper<? super K, ? super V, String> mapper; /** - * Print data message with given writer. The PrintWriter can be null in order to + * Print customized output with given writer. The PrintWriter can be null in order to * distinguish between {@code System.out} and the others. If the PrintWriter is {@code PrintWriter(System.out)}, * then it would close {@code System.out} output stream. * <p> * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead. * * @param printWriter Use {@code System.out.println} if {@code null}. - * @param streamName The given name will be printed. + * @param mapper The mapper which can allow user to customize output will be printed. + * @param label The given name will be printed. */ - public PrintForeachAction(final PrintWriter printWriter, final String streamName) { + public PrintForeachAction(final PrintWriter printWriter, final KeyValueMapper<? super K, ? super V, String> mapper, final String label) { this.printWriter = printWriter; - this.streamName = streamName; + this.mapper = mapper; + this.label = label; } @Override public void apply(final K key, final V value) { - final String data = String.format("[%s]: %s, %s", streamName, key, value); + final String data = String.format("[%s]: %s", label, mapper.apply(key, value)); if (printWriter == null) { System.out.println(data); } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 9cf8b38..ba537c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -103,6 +103,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public static final String REPARTITION_TOPIC_SUFFIX = "-repartition"; + private final KeyValueMapper<K, V, String> defaultKeyValueMapper; private final boolean repartitionRequired; @@ -110,6 +111,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V boolean repartitionRequired) { super(topology, name, sourceNodes); this.repartitionRequired = repartitionRequired; + this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() { + @Override + public String apply(K key, V value) { + return String.format("%s, %s", key, value); + } + }; } @Override @@ -178,63 +185,98 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public void print() { - print(null, null, null); + print(defaultKeyValueMapper, null, null, this.name); + } + + @Override + public void print(final String label) { + print(defaultKeyValueMapper, null, null, label); } @Override - public void print(String streamName) { - print(null, null, streamName); + public void print(final Serde<K> keySerde, final Serde<V> valSerde) { + print(defaultKeyValueMapper, keySerde, valSerde, this.name); } @Override - public void print(Serde<K> keySerde, Serde<V> valSerde) { - print(keySerde, valSerde, null); + public void print(final Serde<K> keySerde, final Serde<V> valSerde, final String label) { + print(defaultKeyValueMapper, keySerde, valSerde, label); } @Override - public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) { + public void print(final KeyValueMapper<? super K, ? super V, String> mapper) { + print(mapper, null, null, this.name); + } + + @Override + public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String label) { + print(mapper, null, null, label); + } + + @Override + public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde) { + print(mapper, keySerde, valSerde, this.name); + } + + @Override + public void print(KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, Serde<V> valSerde, final String label) { + Objects.requireNonNull(mapper, "mapper can't be null"); + Objects.requireNonNull(label, "label can't be null"); String name = topology.newName(PRINTING_NAME); - streamName = (streamName == null) ? this.name : streamName; - topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, streamName), keySerde, valSerde), this.name); + topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(null, mapper, label), keySerde, valSerde), this.name); } + @Override + public void writeAsText(final String filePath) { + writeAsText(filePath, this.name, null, null, defaultKeyValueMapper); + } @Override - public void writeAsText(String filePath) { - writeAsText(filePath, null, null, null); + public void writeAsText(final String filePath, final String label) { + writeAsText(filePath, label, null, null, defaultKeyValueMapper); } @Override - public void writeAsText(String filePath, String streamName) { - writeAsText(filePath, streamName, null, null); + public void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde) { + writeAsText(filePath, this.name, keySerde, valSerde, defaultKeyValueMapper); } + @Override + public void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde) { + writeAsText(filePath, label, keySerde, valSerde, defaultKeyValueMapper); + } @Override - public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) { - writeAsText(filePath, null, keySerde, valSerde); + public void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper) { + writeAsText(filePath, this.name, null, null, mapper); + } + + @Override + public void writeAsText(final String filePath, final String label, final KeyValueMapper<? super K, ? super V, String> mapper) { + writeAsText(filePath, label, null, null, mapper); + } + + @Override + public void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper) { + writeAsText(filePath, this.name, keySerde, valSerde, mapper); } - /** - * @throws TopologyBuilderException if file is not found - */ @Override - public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) { + public void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde, KeyValueMapper<? super K, ? super V, String> mapper) { Objects.requireNonNull(filePath, "filePath can't be null"); + Objects.requireNonNull(label, "label can't be null"); + Objects.requireNonNull(mapper, "mapper can't be null"); if (filePath.trim().isEmpty()) { throw new TopologyBuilderException("filePath can't be an empty string"); } - String name = topology.newName(PRINTING_NAME); - streamName = (streamName == null) ? this.name : streamName; + final String name = topology.newName(PRINTING_NAME); try { - PrintWriter printWriter = null; - printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name()); - topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, streamName), keySerde, valSerde), this.name); + PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name()); + topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, label), keySerde, valSerde), this.name); } catch (FileNotFoundException | UnsupportedEncodingException e) { String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage(); throw new TopologyBuilderException(message); } - } @Override @@ -678,9 +720,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V this.repartitionRequired); } - - - private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows, final Serde<K> keySerde, final Serde<V> valueSerde, http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 912f42c..679efe5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -75,6 +75,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, private final ProcessorSupplier<?, ?> processorSupplier; + private final KeyValueMapper<K, V, String> defaultKeyValueMapper; + private final String queryableStoreName; private final boolean isQueryable; @@ -94,6 +96,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, this.keySerde = null; this.valSerde = null; this.isQueryable = isQueryable; + this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() { + @Override + public String apply(K key, V value) { + return String.format("%s, %s", key, value); + } + }; } public KTableImpl(KStreamBuilder topology, @@ -110,6 +118,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, this.keySerde = keySerde; this.valSerde = valSerde; this.isQueryable = isQueryable; + this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() { + @Override + public String apply(K key, V value) { + return String.format("%s, %s", key, value); + } + }; } @Override @@ -226,56 +240,56 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public void print() { - print(null, null, null); + print(null, null, this.name); } @Override - public void print(String streamName) { - print(null, null, streamName); + public void print(String label) { + print(null, null, label); } @Override public void print(Serde<K> keySerde, Serde<V> valSerde) { - print(keySerde, valSerde, null); + print(keySerde, valSerde, this.name); } @Override - public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) { + public void print(Serde<K> keySerde, final Serde<V> valSerde, String label) { + Objects.requireNonNull(label, "label can't be null"); String name = topology.newName(PRINTING_NAME); - streamName = (streamName == null) ? this.name : streamName; - topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, streamName), keySerde, valSerde), this.name); + topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label), keySerde, valSerde), this.name); } @Override public void writeAsText(String filePath) { - writeAsText(filePath, null, null, null); + writeAsText(filePath, this.name, null, null); } @Override - public void writeAsText(String filePath, String streamName) { - writeAsText(filePath, streamName, null, null); + public void writeAsText(String filePath, String label) { + writeAsText(filePath, label, null, null); } @Override public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) { - writeAsText(filePath, null, keySerde, valSerde); + writeAsText(filePath, this.name, keySerde, valSerde); } /** * @throws TopologyBuilderException if file is not found */ @Override - public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) { + public void writeAsText(String filePath, String label, Serde<K> keySerde, Serde<V> valSerde) { Objects.requireNonNull(filePath, "filePath can't be null"); + Objects.requireNonNull(label, "label can't be null"); if (filePath.trim().isEmpty()) { throw new TopologyBuilderException("filePath can't be an empty string"); } String name = topology.newName(PRINTING_NAME); - streamName = (streamName == null) ? this.name : streamName; try { PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name()); - topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, streamName), keySerde, valSerde), this.name); + topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label), keySerde, valSerde), this.name); } catch (FileNotFoundException | UnsupportedEncodingException e) { String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage(); throw new TopologyBuilderException(message); http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java index c537e0a..c94b868 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.PrintForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -63,7 +64,13 @@ public class KStreamPrintTest { @Test public void testPrintKeyValueWithName() { - final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new PrintForeachAction(printWriter, "test-stream"), intSerd, stringSerd); + KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + @Override + public String apply(Integer key, String value) { + return String.format("%d, %s", key, value); + } + }; + final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, "test-stream"), intSerd, stringSerd); final List<KeyValue<Integer, String>> inputRecords = Arrays.asList( new KeyValue<>(0, "zero"), @@ -82,9 +89,42 @@ public class KStreamPrintTest { driver.process(topicName, record.key, record.value); } printWriter.flush(); - final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\n"); + final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n"); for (int i = 0; i < flushOutDatas.length; i++) { - assertEquals(flushOutDatas[i], expectedResult[i]); + assertEquals(expectedResult[i], flushOutDatas[i]); + } + } + + @Test + public void testPrintStreamWithProvidedKeyValueMapper() { + final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() { + @Override + public String apply(Integer key, String value) { + return String.format("(%d, %s)", key, value); + } + }; + final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, "test-stream"), intSerd, stringSerd); + + final List<KeyValue<Integer, String>> inputRecords = Arrays.asList( + new KeyValue<>(0, "zero"), + new KeyValue<>(1, "one"), + new KeyValue<>(2, "two"), + new KeyValue<>(3, "three")); + + final String[] expectedResult = {"[test-stream]: (0, zero)", "[test-stream]: (1, one)", "[test-stream]: (2, two)", "[test-stream]: (3, three)"}; + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName); + stream.process(kStreamPrint); + + driver = new KStreamTestDriver(builder); + for (KeyValue<Integer, String> record: inputRecords) { + driver.process(topicName, record.key, record.value); + } + printWriter.flush(); + final String[] results = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n"); + for (int i = 0; i < results.length; i++) { + assertEquals(expectedResult[i], results[i]); } }