[
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530178#comment-15530178
]
Aljoscha Krettek commented on FLINK-3674:
-----------------------------------------
Turns out that adding a generic interface for {{TimelyFunction}} is not too
well defined. I'm assuming that people would want to have the chance to emit
elements when they get a timer callback. How would this work if the user
function is a {{FilterFunction}} or a {{ReduceFunction}}?
For now I'm going for {{TimelyFlatMapFunction}} that looks like this:
{code}
@Public
public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
/**
* The core method of the {@code TimelyFlatMapFunction}. Takes an
element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param timerService A {@link TimerService} that allows setting
timers and querying the
* current time.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an
exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(I value, TimerService timerService, Collector<O> out)
throws Exception;
/**
* Called when a timer set using {@link TimerService} fires.
*
* @param timestamp The timestamp of the firing timer.
* @param timeDomain The {@link TimeDomain} of the firing timer.
* @param timerService A {@link TimerService} that allows setting
timers and querying the
* current time.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an
exception will cause the operation
* to fail and may trigger recovery.
*/
void onTimer(long timestamp, TimeDomain timeDomain, TimerService
timerService, Collector<O> out) throws Exception ;
}
{code}
In {{onTimer()}} the user can only emit elements of the output type of the
function.
> Add an interface for Time aware User Functions
> ----------------------------------------------
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.0.0
> Reporter: Stephan Ewen
> Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction<String, String>,
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)