[
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585559#comment-15585559
]
ASF GitHub Bot commented on FLINK-3674:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2570#discussion_r83857663
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting
elements, or unnesting lists
+ * and arrays.
+ *
+ * <p>A {@code TimelyFlatMapFunction} can, in addition to the
functionality of a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set
timers and react
+ * to them firing.
+ *
+ * <pre>{@code
+ * DataStream<X> input = ...;
+ *
+ * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }</pre>
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction<I, O> extends Function,
Serializable {
+
+ /**
+ * The core method of the {@code TimelyFlatMapFunction}. Takes an
element from the input data set and transforms
+ * it into zero, one, or more elements.
+ *
+ * @param value The input value.
+ * @param timerService A {@link TimerService} that allows setting
timers and querying the
+ * current time.
+ * @param out The collector for returning result values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an
exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void flatMap(I value, TimerService timerService, Collector<O> out)
throws Exception;
+
+ /**
+ * Called when a timer set using {@link TimerService} fires.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param timeDomain The {@link TimeDomain} of the firing timer.
+ * @param timerService A {@link TimerService} that allows setting
timers and querying the
+ * current time.
+ * @param out The collector for returning result values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an
exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void onTimer(long timestamp, TimeDomain timeDomain, TimerService
timerService, Collector<O> out) throws Exception ;
--- End diff --
I wonder if `TimeDomain` and `TimerService` should be parameter to methods
in this interface. I assume both remain stable for the lifetime of the UDF and
could be passed once in some init method that can also be preimplemented in a
`RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I
like to keep the number of parameters small when possible.
> Add an interface for Time aware User Functions
> ----------------------------------------------
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.0.0
> Reporter: Stephan Ewen
> Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction<String, String>,
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)