[
https://issues.apache.org/jira/browse/KAFKA-4720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15852102#comment-15852102
]
ASF GitHub Bot commented on KAFKA-4720:
---------------------------------------
GitHub user stevenschlansker opened a pull request:
https://github.com/apache/kafka/pull/2493
KAFKA-4720: add a KStream#peek(ForeachAction<K, V>)
https://issues.apache.org/jira/browse/KAFKA-4720
Peek is a handy method to have to insert diagnostics that do not affect the
stream itself, but some external state such as logging or metrics collection.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/stevenschlansker/kafka kafka-4720-peek
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/2493.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2493
----
commit 3f06ee010028335674f1a9e21c0fa740e3d5a950
Author: Steven Schlansker <[email protected]>
Date: 2017-02-03T20:46:43Z
KAFKA-4720: add a KStream#peek(ForeachAction<K, V>)
----
> Add KStream.peek(ForeachAction<K,V>)
> ------------------------------------
>
> Key: KAFKA-4720
> URL: https://issues.apache.org/jira/browse/KAFKA-4720
> Project: Kafka
> Issue Type: New Feature
> Components: streams
> Affects Versions: 0.10.1.1
> Reporter: Steven Schlansker
>
> Java's Stream provides a handy peek method that observes elements in the
> stream without transforming or filtering them. While you can emulate this
> functionality with either a filter or map, peek provides potentially useful
> semantic information (doesn't modify the stream) and is much more concise.
> Example usage: using Dropwizard Metrics to provide event counters
> {code}
> KStream<Integer, String> s = ...;
> s.map(this::mungeData)
> .peek((i, s) -> metrics.noteMungedEvent(i, s))
> .filter(this::hadProcessingError)
> .print();
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)