[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395407#comment-15395407 ]
ASF GitHub Bot commented on FLINK-3674: --------------------------------------- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2301#discussion_r72417108 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java --- @@ -55,13 +56,18 @@ /** the user function */ protected final F userFunction; + + protected EventTimeFunction eventTimeFunction; /** Flag to prevent duplicate function.close() calls in close() and dispose() */ private transient boolean functionsClosed = false; public AbstractUdfStreamOperator(F userFunction) { this.userFunction = requireNonNull(userFunction); + if(userFunction instanceof EventTimeFunction) { --- End diff -- Thanks for the super quick review. A quick question - In intellij IDE how should a formatting be applied? In Eclipse Ctrl+Shift+F applies the formatter that was configured. How about here? I can update the next commit based on that. Thank you. > Add an interface for EventTime aware User Function > -------------------------------------------------- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Stephan Ewen > Assignee: ramkrishna.s.vasudevan > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction<String, String>, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)