[ https://issues.apache.org/jira/browse/KAFKA-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344492#comment-16344492 ]
ASF GitHub Bot commented on KAFKA-4772: --------------------------------------- guozhangwang closed pull request #2669: KAFKA-4772: [WIP] Use peek to implement print URL: https://github.com/apache/kafka/pull/2669 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index a11d8f443b8..ab451dc3fdb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -325,6 +325,25 @@ void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName); + /** + * Print the records of this stream to {@code System.out}. + * <p> + * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling + * {@code toString()} on the deserialized object. + * <p> + * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String}, + * {@link Integer} etc. to get meaningful information. + * + * @param keySerde key serde used to deserialize key if type is {@code byte[]}, + * @param valSerde value serde used to deserialize value if type is {@code byte[]}, + * @param streamName the name used to label the key/value pairs printed to the console + * @param mapper mapper to allow customized output of key and value + */ + void print(final Serde<K> keySerde, + final Serde<V> valSerde, + final String streamName, + final KeyValueMapper<K, V, String> mapper); + /** * Write the records of this stream to a file at the given path. * This function will use the generated name of the parent processor node to label the key/value pairs printed to diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 79abbb558eb..92c32b8c14c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -190,11 +190,42 @@ public void print(Serde<K> keySerde, Serde<V> valSerde) { @Override public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) { + print(keySerde, valSerde, streamName, null); + } + + @Override + public void print(final Serde<K> keySerde, final Serde<V> valSerde, String streamName, final KeyValueMapper<K, V, String> mapper){ String name = topology.newName(PRINTING_NAME); streamName = (streamName == null) ? this.name : streamName; - topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, streamName), this.name); + topology.addProcessor(name, new KStreamPeek<>(printAction(System.out, keySerde, valSerde, streamName, mapper)), this.name); } + private static <K, V> ForeachAction<K, V> printAction(final PrintStream printStream, final Serde<?> keySerde, final Serde<?> valueSerde, final String streamName, final KeyValueMapper<K, V, String> mapper) { + return new ForeachAction<K, V>() { + @Override + public void apply(final K key, final V value) { + K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer()); + V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer()); + if(mapper == null) { + printStream.println("[" + streamName + "]: " + keyToPrint + " , " + valueToPrint); + } else { + printStream.println("[" + streamName + "]: " + mapper.apply(keyToPrint, valueToPrint)); + } + } + + private Object maybeDeserialize(Object receivedElement, Deserializer<?> deserializer) { + if (receivedElement == null) { + return null; + } + + if (receivedElement instanceof byte[]) { + return deserializer.deserialize("Topic", (byte[]) receivedElement); + } + + return receivedElement; + } + }; + } @Override public void writeAsText(String filePath) { @@ -226,7 +257,7 @@ public void writeAsText(String filePath, String streamName, Serde<K> keySerde, S try { PrintStream printStream = new PrintStream(new FileOutputStream(filePath)); - topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde, streamName), this.name); + topology.addProcessor(name, new KStreamPeek<>(printAction(printStream, keySerde, valSerde, streamName, null)), this.name); } catch (FileNotFoundException e) { String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Exploit #peek to implement #print() and other methods > ----------------------------------------------------- > > Key: KAFKA-4772 > URL: https://issues.apache.org/jira/browse/KAFKA-4772 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: james chien > Priority: Minor > Labels: beginner, newbie > Fix For: 0.11.0.0 > > > From: https://github.com/apache/kafka/pull/2493#pullrequestreview-22157555 > Things that I can think of: > - print / writeAsTest can be a special impl of peek; KStreamPrint etc can be > removed. > - consider collapse KStreamPeek with KStreamForeach with a flag parameter > indicating if the acted key-value pair should still be forwarded. -- This message was sent by Atlassian JIRA (v7.6.3#76005)