[
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15588285#comment-15588285
]
ASF GitHub Bot commented on FLINK-3674:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2570#discussion_r84038623
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+ //
------------------------------------------------------------------------
+ // Watermark handling
+ //
------------------------------------------------------------------------
+
+ /**
+ * Returns a {@link InternalTimerService} that can be used to query
current processing time
+ * and event time and to set timers. An operator can have several timer
services, where
+ * each has its own namespace serializer. Timer services are
differentiated by the string
+ * key that is given when requesting them, if you call this method with
the same key
+ * multiple times you will get the same timer service instance in
subsequent requests.
+ *
+ * <p>Timers are always scoped to a key, the currently active key of a
keyed stream operation.
+ * When a timer fires, this key will also be set as the currently
active key.
+ *
+ * <p>Each timer has attached metadata, the namespace. Different timer
services
+ * can have a different namespace type. If you don't need namespace
differentiation you
+ * can use {@link VoidNamespaceSerializer} as the namespace serializer.
+ *
+ * @param name The name of the requested timer service. If no service
exists under the given
+ * name a new one will be created and returned.
+ * @param keySerializer {@code TypeSerializer} for the keys of the
timers.
+ * @param namespaceSerializer {@code TypeSerializer} for the timer
namespace.
+ * @param triggerable The {@link Triggerable} that should be invoked
when timers fire
+ *
+ * @param <K> The type of the timer keys.
+ * @param <N> The type of the timer namespace.
+ */
+ public <K, N> InternalTimerService<N> getInternalTimerService(
+ String name,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ Triggerable<K, N> triggerable) {
+
+ @SuppressWarnings("unchecked")
+ HeapInternalTimerService<K, N> service =
(HeapInternalTimerService<K, N>) timerServices.get(name);
+
+ if (service == null) {
+ if (restoredServices != null &&
restoredServices.containsKey(name)) {
+ @SuppressWarnings("unchecked")
+ HeapInternalTimerService.RestoredTimers<K, N>
restoredService =
--- End diff --
I can replace it by this but not sure if it's more readable:
```
@SuppressWarnings("unchecked")
HeapInternalTimerService.RestoredTimers<K, N> restoredService =
restoredServices == null ? null :
(HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
if (restoredService != null) {
...
}
```
> Add an interface for Time aware User Functions
> ----------------------------------------------
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.0.0
> Reporter: Stephan Ewen
> Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction<String, String>,
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)