[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5481 ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172136142 --- Diff: docs/dev/stream/operators/process_function.md --- @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. \ No newline at end of file +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. + +## The KeyedProcessFunction + +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)` +method. + +{% highlight java %} --- End diff -- @bowenli86 As soon as the scala example is added, I can take care of the other two comments and merge! Let me know when you update the PR, and thanks for the work! ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172135390 --- Diff: docs/dev/stream/operators/process_function.md --- @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. \ No newline at end of file +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. + +## The KeyedProcessFunction + +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)` +method. + +{% highlight java %} --- End diff -- Maybe also add Scala example code. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172135104 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -70,21 +69,15 @@ public void open() throws Exception { @Override public void onEventTime(InternalTimer timer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); - onTimerContext.timeDomain = TimeDomain.EVENT_TIME; - onTimerContext.timer = timer; - userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); - onTimerContext.timeDomain = null; - onTimerContext.timer = null; + reinitialize(userFunction, TimeDomain.EVENT_TIME, timer); --- End diff -- Hate to be picky, but I think the name is a bit misleading and we could probably put all of this in a method `invokeUserTime()` that does what `reinitialise()` and `reset()` do. @kl0u I think you can quickly fix that when merging. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172133573 --- Diff: docs/dev/stream/operators/process_function.md --- @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. \ No newline at end of file +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. + +## The KeyedProcessFunction + +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)` +method. + +{% highlight java %} +@Override +public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { --- End diff -- I believe this is now `public void onTimer(long timestamp, OnTimerContext ctx, Collector out)`, right? @kl0u you could fix this while merging. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171566773 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -66,9 +66,9 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * function, this function can also query the time and set timers. When reacting to the firing * of set timers the function can directly emit elements and/or register yet more timers. * -* @param processFunction The [[ProcessFunction]] that is called for each element -* in the stream. +* @param processFunction The [[ProcessFunction]] that is called for each element in the stream. --- End diff -- Please also add that the user now should use the new `KeyedProcessFunction` instead. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171233924 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation type) { * @param The type of elements emitted by the {@code ProcessFunction}. * * @return The transformed {@link DataStream}. +* +* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)} */ + @Deprecated @Override @Internal public SingleOutputStreamOperator process( ProcessFunction processFunction, TypeInformation outputType) { - KeyedProcessOperator operator = - new KeyedProcessOperator<>(clean(processFunction)); + LegacyKeyedProcessOperator operator = new LegacyKeyedProcessOperator<>(clean(processFunction)); return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} +* function, this function can also query the time and set timers. When reacting to the firing +* of set timers the function can directly emit elements and/or register yet more timers. +* +* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) { + --- End diff -- The signature should be: ```public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) {``` You do not have to re-define the type of the key (the `` in the beginning) as we are already in a `KeyedStream` with an already defined type of key. Also remove the corresponding part in the `javadoc`. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171237037 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } - + + /** + * Applies the given [[KeyedProcessFunction]] on the input stream, thereby + * creating a transformed output stream. + * + * The function will be called for every element in the stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. + * + * @param keyedProcessFunction The [[KeyedProcessFunction]] that is called for each element + * in the stream. + */ + @PublicEvolving + def process[K, R: TypeInformation]( --- End diff -- As in java, you do not need to redefine the `K` here. So you can remove it `def process[R: TypeInformation](`... ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171234582 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation type) { * @param The type of elements emitted by the {@code ProcessFunction}. * * @return The transformed {@link DataStream}. +* +* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)} */ + @Deprecated @Override @Internal public SingleOutputStreamOperator process( ProcessFunction processFunction, TypeInformation outputType) { - KeyedProcessOperator operator = - new KeyedProcessOperator<>(clean(processFunction)); + LegacyKeyedProcessOperator operator = new LegacyKeyedProcessOperator<>(clean(processFunction)); return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} +* function, this function can also query the time and set timers. When reacting to the firing +* of set timers the function can directly emit elements and/or register yet more timers. +* +* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) { + + TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( + keyedProcessFunction, + KeyedProcessFunction.class, + 0, + 1, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + getType(), + Utils.getCallLocationName(), + true); + + return process(keyedProcessFunction, outType); + } + + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} +* function, this function can also query the time and set timers. When reacting to the firing +* of set timers the function can directly emit elements and/or register yet more timers. +* +* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream. +* +* @param outputType {@link TypeInformation} for the result type of the function. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @Internal + public SingleOutputStreamOperator process( + KeyedProcessFunction keyedProcessFunction, --- End diff -- Same here, you do not have to redefine the type of the key. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171235356 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +/** + * A keyed function that processes elements of a stream. + * + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. + * + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code KeyedProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. + * + * @param Type of the key. + * @param Type of the input elements. + * @param Type of the output elements. + */ +@PublicEvolving +public abstract class KeyedProcessFunction extends AbstractRichFunction { + + private static final long serialVersionUID = 1L; + + /** +* Process one element from the input stream. +* +* This function can output zero or more elements using the {@link Collector} parameter +* and also update internal state or set timers using the {@link Context} parameter. +* +* @param value The input value. +* @param ctx A {@link Context} that allows querying the timestamp of the element and getting +*a {@link TimerService} for registering timers and querying the time. The +*context is only valid during the invocation of this method, do not store it. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + public abstract void processElement(I value, Context ctx, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link TimeDomain}, and the key +*of the firing timer and getting a {@link TimerService} for registering timers and querying the time. +*The context is only valid during the invocation of this method, do not store it. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {} + --- End diff --
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171236298 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -54,21 +54,20 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] // /** -* Applies the given [[ProcessFunction]] on the input stream, thereby -* creating a transformed output stream. -* -* The function will be called for every element in the stream and can produce -* zero or more output. The function can also query the time and set timers. When -* reacting to the firing of set timers the function can emit yet more elements. -* -* The function will be called for every element in the input streams and can produce zero -* or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] -* function, this function can also query the time and set timers. When reacting to the firing -* of set timers the function can directly emit elements and/or register yet more timers. -* -* @param processFunction The [[ProcessFunction]] that is called for each element -* in the stream. -*/ + * Applies the given [[ProcessFunction]] on the input stream, thereby + * creating a transformed output stream. + * + * The function will be called for every element in the stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. + * + * @param processFunction The [[ProcessFunction]] that is called for each element in the stream. + */ --- End diff -- Revert all reformatings (indent by 1 space) and also add the `@deprecated` annotation with the correct, non-deprecated alternative, as done in the corresponding `java` class. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171237136 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } - + + /** + * Applies the given [[KeyedProcessFunction]] on the input stream, thereby + * creating a transformed output stream. + * + * The function will be called for every element in the stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. + * + * @param keyedProcessFunction The [[KeyedProcessFunction]] that is called for each element + * in the stream. + */ + @PublicEvolving + def process[K, R: TypeInformation]( +keyedProcessFunction: KeyedProcessFunction[K, T, R]): DataStream[R] = { + +if (keyedProcessFunction == null) { + throw new NullPointerException("ProcessFunction must not be null.") +} --- End diff -- The message now should be `"KeyedProcessFunction must not be null."` ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171233273 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -84,18 +86,18 @@ * elements that have the same key. * * @param The type of the elements in the Keyed Stream. - * @param The type of the key in the Keyed Stream. + * @param The type of the key in the Keyed Stream. --- End diff -- Revert the renaming from `KEY` to `K`. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171235653 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -132,15 +139,15 @@ public TimerService timerService() { } } - private class OnTimerContextImpl extends ProcessFunction.OnTimerContext{ + private class OnTimerContextImpl extends KeyedProcessFunction.OnTimerContext { --- End diff -- With the proposed changes you can also remove the `` in the `.OnTimerContext`. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171234411 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation type) { * @param The type of elements emitted by the {@code ProcessFunction}. * * @return The transformed {@link DataStream}. +* +* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)} */ + @Deprecated @Override @Internal public SingleOutputStreamOperator process( ProcessFunction processFunction, TypeInformation outputType) { - KeyedProcessOperator operator = - new KeyedProcessOperator<>(clean(processFunction)); + LegacyKeyedProcessOperator operator = new LegacyKeyedProcessOperator<>(clean(processFunction)); return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} +* function, this function can also query the time and set timers. When reacting to the firing +* of set timers the function can directly emit elements and/or register yet more timers. +* +* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) { + + TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( + keyedProcessFunction, + KeyedProcessFunction.class, + 0, --- End diff -- The indices here are not `0` and `1` for input and output type, but `1` and `2`. In the process function it was 0 and 1 because we did not have the key. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171235118 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +/** + * A keyed function that processes elements of a stream. + * + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. + * + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code KeyedProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. + * + * @param Type of the key. + * @param Type of the input elements. + * @param Type of the output elements. + */ +@PublicEvolving +public abstract class KeyedProcessFunction extends AbstractRichFunction { + + private static final long serialVersionUID = 1L; + + /** +* Process one element from the input stream. +* +* This function can output zero or more elements using the {@link Collector} parameter +* and also update internal state or set timers using the {@link Context} parameter. +* +* @param value The input value. +* @param ctx A {@link Context} that allows querying the timestamp of the element and getting +*a {@link TimerService} for registering timers and querying the time. The +*context is only valid during the invocation of this method, do not store it. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + public abstract void processElement(I value, Context ctx, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link TimeDomain}, and the key +*of the firing timer and getting a {@link TimerService} for registering timers and querying the time. +*The context is only valid during the invocation of this method, do not store it. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {} + + /** +* Inf
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571398 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } - + + /** +* Applies the given [[KeyedProcessFunction]] on the input stream, thereby --- End diff -- ditto ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169573342 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java --- @@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() { return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param keyedProcessFunction The {@link ProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) { --- End diff -- Does it make sense to add `process` with a `KeyedProcessFunction` on non keyed `DataStream`? ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571436 --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala --- @@ -448,11 +447,35 @@ class DataStreamTest extends AbstractTestBase { val flatMapped = src.keyBy(x => x).process(processFunction) assert(processFunction == getFunctionForDataStream(flatMapped)) + assert(getOperatorForDataStream(flatMapped).isInstanceOf[LegacyKeyedProcessOperator[_, _, _]]) + } + + /** +* Verify that a [[KeyedStream.process(KeyedProcessFunction)]] call is correctly --- End diff -- ditto ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571457 --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala --- @@ -473,6 +496,28 @@ class DataStreamTest extends AbstractTestBase { assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]]) } + /** +* Verify that a [[DataStream.process(KeyedProcessFunction)]] call is correctly --- End diff -- ditto ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571197 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala --- @@ -686,6 +686,27 @@ class DataStream[T](stream: JavaStream[T]) { asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } + /** +* Applies the given [[KeyedProcessFunction]] on the input stream, thereby --- End diff -- ditto ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571208 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala --- @@ -666,18 +666,18 @@ class DataStream[T](stream: JavaStream[T]) { } /** - * Applies the given [[ProcessFunction]] on the input stream, thereby - * creating a transformed output stream. - * - * The function will be called for every element in the stream and can produce - * zero or more output. - * - * @param processFunction The [[ProcessFunction]] that is called for each element - * in the stream. - */ +* Applies the given [[ProcessFunction]] on the input stream, thereby --- End diff -- Can you revert this formatting? I think proper java docs should be as this was before: ``` /** * */ ``` instead of: ``` /** * */ ``` ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user juergenthomann commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169077241 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java --- @@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() { return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param keyedProcessFunction The {@link ProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code PKeyedProcessFunction}. --- End diff -- Could this be a typo with **P**KeyedProcessFunction instead of KeyedProcessFunction? ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169060125 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -377,119 +375,114 @@ public T getKey(T value) throws Exception { } } - private static class QueryingFlatMapFunction extends ProcessFunction { + private static class QueryingFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; public QueryingFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; + this.expectedTimeDomain = timeDomain; } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) { out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } else { out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } } @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + // Do nothing } } - private static class TriggeringFlatMapFunction extends ProcessFunction { + private static class TriggeringFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; + + static final Integer EXPECTED_KEY = 17; public TriggeringFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; + this.expectedTimeDomain = timeDomain; } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect(value); - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) { ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); } else { ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); } } @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { - - assertEquals(this.timeDomain, ctx.timeDomain()); + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + assertEquals(EXPECTED_KEY, ctx.getCurrentKey()); + assertEquals(expectedTimeDomain, ctx.timeDomain()); out.collect(1777); } } - private static class TriggeringStatefulFlatMapFunction extends ProcessFunction { + private static class TriggeringStatefulFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; private final ValueStateDescriptor state = new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE); - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; + this.expectedTimeDomain = timeDomain; } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect("INPUT:" + value); getRuntimeContext().getState(state).update(value); - if (timeDomain.eq
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169060097 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -377,119 +375,114 @@ public T getKey(T value) throws Exception { } } - private static class QueryingFlatMapFunction extends ProcessFunction { + private static class QueryingFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; public QueryingFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; + this.expectedTimeDomain = timeDomain; } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) { out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } else { out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } } @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + // Do nothing } } - private static class TriggeringFlatMapFunction extends ProcessFunction { + private static class TriggeringFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; + + static final Integer EXPECTED_KEY = 17; --- End diff -- As in the second PR: does this have to be a static field? Or can we initialise it in the constructor? I think it should work with the `expectedKey` set in the constructor as long as this is not an `ITCase` - and it's not, it's using test harness. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r168265503 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -162,7 +162,7 @@ class GroupAggProcessFunction( override def onTimer( timestamp: Long, - ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + ctx: ProcessFunction[CRow, CRow]#OnTimerContext[_], --- End diff -- yes, I think I'll create a `KeyedProcessFunction` as Aljoscha suggested, and we probably don't need these scala changes any more. ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r168122242 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -162,7 +162,7 @@ class GroupAggProcessFunction( override def onTimer( timestamp: Long, - ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + ctx: ProcessFunction[CRow, CRow]#OnTimerContext[_], --- End diff -- I'm not a scala expert, but is this change somehow related to adding the `getCurrentKey()` method? ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r168121129 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -397,17 +396,16 @@ public void processElement(Integer value, Context ctx, Collector out) th } @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { } } private static class TriggeringFlatMapFunction extends ProcessFunction { private static final long serialVersionUID = 1L; + static final int TEST_VALUE = 17; + private final TimeDomain timeDomain; public TriggeringFlatMapFunction(TimeDomain timeDomain) { --- End diff -- rename `timeDomain` -> `expectedTimeDomain` and add `expectedKey` and add assertion for the expected key to `onTimer()` method triggered both in `KeyedProcessOperatorTest#testEventTimeTimers` and `KeyedProcessOperatorTest#testProcessingTimeTimers` ---
[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5481 [FLINK-8560] Access to the current key in ProcessFunction after keyBy() ## What is the purpose of the change Currently, it is required to store the key of a keyBy() in the processElement method to have access to it in the OnTimerContext. This is not so good as you have to check in the processElement method for every element if the key is already stored and set it if it's not already set. A possible solution would adding OnTimerContext#getCurrentKey() or a similar method. Maybe having it in the open() method could maybe work as well. ## Brief change log added `OnTimerContext#getCurrentKey()` One limitation is that this impl of `getCurrentKey()` currently is not strongly typed. Declaring the key's type requires adding a new generic type to `ProcessFunction` - making the declaration from `ProcessFunction` to `ProcessFunction`. I'm worried it may break user's application, so I decide to make `getCurrentKey()` return an object. I'd like to discuss the feasibility of having strong type. ## Verifying this change This change is already covered by existing tests, such as *KeyedProcessOperatorTest*. ## Does this pull request potentially affect one of the following parts: none ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8560 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5481 commit c5b8a4f27094b88c8641e2bdd30ea0ca65a7a4be Author: Bowen Li Date: 2018-02-13T06:33:06Z [FLINK-8560] Access to the current key in ProcessFunction after keyBy() ---