[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-03-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5481


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-03-05 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r172136142
  
--- Diff: docs/dev/stream/operators/process_function.md ---
@@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends 
ProcessFunction[(String, String), (String
 the current processing time as event-time timestamp. This behavior is very 
subtle and might not be noticed by users. Well, it's
 harmful because processing-time timestamps are indeterministic and not 
aligned with watermarks. Besides, user-implemented logic
 depends on this wrong timestamp highly likely is unintendedly faulty. So 
we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
-that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
\ No newline at end of file
+that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
+
+## The KeyedProcessFunction
+
+`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access 
to the key of timers in its `onTimer(...)`
+method.
+
+{% highlight java %}
--- End diff --

@bowenli86 As soon as the scala example is added, I can take care of the 
other two comments and merge! Let me know when you update the PR, and thanks 
for the work!


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-03-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r172135390
  
--- Diff: docs/dev/stream/operators/process_function.md ---
@@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends 
ProcessFunction[(String, String), (String
 the current processing time as event-time timestamp. This behavior is very 
subtle and might not be noticed by users. Well, it's
 harmful because processing-time timestamps are indeterministic and not 
aligned with watermarks. Besides, user-implemented logic
 depends on this wrong timestamp highly likely is unintendedly faulty. So 
we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
-that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
\ No newline at end of file
+that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
+
+## The KeyedProcessFunction
+
+`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access 
to the key of timers in its `onTimer(...)`
+method.
+
+{% highlight java %}
--- End diff --

Maybe also add Scala example code.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-03-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r172135104
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 ---
@@ -70,21 +69,15 @@ public void open() throws Exception {
@Override
public void onEventTime(InternalTimer timer) throws 
Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
-   onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-   onTimerContext.timer = timer;
-   userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
-   onTimerContext.timeDomain = null;
-   onTimerContext.timer = null;
+   reinitialize(userFunction, TimeDomain.EVENT_TIME, timer);
--- End diff --

Hate to be picky, but I think the name is a bit misleading and we could 
probably put all of this in a method `invokeUserTime()` that does what 
`reinitialise()` and `reset()` do.

@kl0u I think you can quickly fix that when merging.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-03-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r172133573
  
--- Diff: docs/dev/stream/operators/process_function.md ---
@@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends 
ProcessFunction[(String, String), (String
 the current processing time as event-time timestamp. This behavior is very 
subtle and might not be noticed by users. Well, it's
 harmful because processing-time timestamps are indeterministic and not 
aligned with watermarks. Besides, user-implemented logic
 depends on this wrong timestamp highly likely is unintendedly faulty. So 
we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
-that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
\ No newline at end of file
+that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
+
+## The KeyedProcessFunction
+
+`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access 
to the key of timers in its `onTimer(...)`
+method.
+
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {
--- End diff --

I believe this is now `public void onTimer(long timestamp, 
OnTimerContext ctx, Collector out)`, right? @kl0u you could fix this 
while merging.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-03-01 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171566773
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -66,9 +66,9 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 * function, this function can also query the time and set timers. When 
reacting to the firing
 * of set timers the function can directly emit elements and/or 
register yet more timers.
 *
-* @param processFunction The [[ProcessFunction]] that is called for 
each element
-*   in the stream.
+* @param processFunction The [[ProcessFunction]] that is called for 
each element in the stream.
--- End diff --

Please also add that the user now should use the new `KeyedProcessFunction` 
instead.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171233924
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -321,19 +324,80 @@ private boolean 
validateKeyTypeIsHashable(TypeInformation type) {
 * @param  The type of elements emitted by the {@code 
ProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
+*
+* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, 
TypeInformation)}
 */
+   @Deprecated
@Override
@Internal
public  SingleOutputStreamOperator process(
ProcessFunction processFunction,
TypeInformation outputType) {
 
-   KeyedProcessOperator operator =
-   new 
KeyedProcessOperator<>(clean(processFunction));
+   LegacyKeyedProcessOperator operator = new 
LegacyKeyedProcessOperator<>(clean(processFunction));
 
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+* function, this function can also query the time and set timers. When 
reacting to the firing
+* of set timers the function can directly emit elements and/or 
register yet more timers.
+*
+* @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {
+
--- End diff --

The signature should be: 
```public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {```

You do not have to re-define the type of the key (the `` in the 
beginning) as we are already in a `KeyedStream` with an already defined type of 
key.

Also remove the corresponding part in the `javadoc`.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171237037
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
 asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
-  
+
+  /**
+   * Applies the given [[KeyedProcessFunction]] on the input stream, 
thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can 
produce
+   * zero or more output. The function can also query the time and set 
timers. When
+   * reacting to the firing of set timers the function can emit yet more 
elements.
+   *
+   * The function will be called for every element in the input streams 
and can produce zero
+   * or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
+   * function, this function can also query the time and set timers. When 
reacting to the firing
+   * of set timers the function can directly emit elements and/or register 
yet more timers.
+   *
+   * @param keyedProcessFunction The [[KeyedProcessFunction]] that is 
called for each element
+   * in the stream.
+   */
+  @PublicEvolving
+  def process[K, R: TypeInformation](
--- End diff --

As in java, you do not need to redefine the `K` here. So you can remove it 
`def process[R: TypeInformation](`...


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171234582
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -321,19 +324,80 @@ private boolean 
validateKeyTypeIsHashable(TypeInformation type) {
 * @param  The type of elements emitted by the {@code 
ProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
+*
+* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, 
TypeInformation)}
 */
+   @Deprecated
@Override
@Internal
public  SingleOutputStreamOperator process(
ProcessFunction processFunction,
TypeInformation outputType) {
 
-   KeyedProcessOperator operator =
-   new 
KeyedProcessOperator<>(clean(processFunction));
+   LegacyKeyedProcessOperator operator = new 
LegacyKeyedProcessOperator<>(clean(processFunction));
 
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+* function, this function can also query the time and set timers. When 
reacting to the firing
+* of set timers the function can directly emit elements and/or 
register yet more timers.
+*
+* @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {
+
+   TypeInformation outType = 
TypeExtractor.getUnaryOperatorReturnType(
+   keyedProcessFunction,
+   KeyedProcessFunction.class,
+   0,
+   1,
+   TypeExtractor.NO_INDEX,
+   TypeExtractor.NO_INDEX,
+   getType(),
+   Utils.getCallLocationName(),
+   true);
+
+   return process(keyedProcessFunction, outType);
+   }
+
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+* function, this function can also query the time and set timers. When 
reacting to the firing
+* of set timers the function can directly emit elements and/or 
register yet more timers.
+*
+* @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+*
+* @param outputType {@link TypeInformation} for the result type of the 
function.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @Internal
+   public  SingleOutputStreamOperator process(
+   KeyedProcessFunction keyedProcessFunction,
--- End diff --

Same here, you do not have to redefine the type of the key.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171235356
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A keyed function that processes elements of a stream.
+ *
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
+ *
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code KeyedProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code KeyedProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} is always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param  Type of the key.
+ * @param  Type of the input elements.
+ * @param  Type of the output elements.
+ */
+@PublicEvolving
+public abstract class KeyedProcessFunction extends 
AbstractRichFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* Process one element from the input stream.
+*
+* This function can output zero or more elements using the {@link 
Collector} parameter
+* and also update internal state or set timers using the {@link 
Context} parameter.
+*
+* @param value The input value.
+* @param ctx A {@link Context} that allows querying the timestamp of 
the element and getting
+*a {@link TimerService} for registering timers and 
querying the time. The
+*context is only valid during the invocation of this 
method, do not store it.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   public abstract void processElement(I value, Context ctx, Collector 
out) throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param ctx An {@link OnTimerContext} that allows querying the 
timestamp, the {@link TimeDomain}, and the key
+*of the firing timer and getting a {@link TimerService} 
for registering timers and querying the time.
+*The context is only valid during the invocation of this 
method, do not store it.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {}
+
--- End diff --

 

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171236298
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -54,21 +54,20 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
   // 

 
   /**
-* Applies the given [[ProcessFunction]] on the input stream, thereby
-* creating a transformed output stream.
-*
-* The function will be called for every element in the stream and can 
produce
-* zero or more output. The function can also query the time and set 
timers. When
-* reacting to the firing of set timers the function can emit yet more 
elements.
-*
-* The function will be called for every element in the input streams 
and can produce zero
-* or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
-* function, this function can also query the time and set timers. When 
reacting to the firing
-* of set timers the function can directly emit elements and/or 
register yet more timers.
-*
-* @param processFunction The [[ProcessFunction]] that is called for 
each element
-*   in the stream.
-*/
+   * Applies the given [[ProcessFunction]] on the input stream, thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can 
produce
+   * zero or more output. The function can also query the time and set 
timers. When
+   * reacting to the firing of set timers the function can emit yet more 
elements.
+   *
+   * The function will be called for every element in the input streams 
and can produce zero
+   * or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
+   * function, this function can also query the time and set timers. When 
reacting to the firing
+   * of set timers the function can directly emit elements and/or register 
yet more timers.
+   *
+   * @param processFunction The [[ProcessFunction]] that is called for 
each element in the stream.
+   */
--- End diff --

Revert all reformatings (indent by 1 space) and also add the `@deprecated` 
annotation with the correct, non-deprecated alternative, as done in the 
corresponding `java` class.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171237136
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
 asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
-  
+
+  /**
+   * Applies the given [[KeyedProcessFunction]] on the input stream, 
thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can 
produce
+   * zero or more output. The function can also query the time and set 
timers. When
+   * reacting to the firing of set timers the function can emit yet more 
elements.
+   *
+   * The function will be called for every element in the input streams 
and can produce zero
+   * or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
+   * function, this function can also query the time and set timers. When 
reacting to the firing
+   * of set timers the function can directly emit elements and/or register 
yet more timers.
+   *
+   * @param keyedProcessFunction The [[KeyedProcessFunction]] that is 
called for each element
+   * in the stream.
+   */
+  @PublicEvolving
+  def process[K, R: TypeInformation](
+keyedProcessFunction: KeyedProcessFunction[K, T, R]): DataStream[R] = {
+
+if (keyedProcessFunction == null) {
+  throw new NullPointerException("ProcessFunction must not be null.")
+}
--- End diff --

The message now should be `"KeyedProcessFunction must not be null."`


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171233273
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -84,18 +86,18 @@
  * elements that have the same key.
  *
  * @param  The type of the elements in the Keyed Stream.
- * @param  The type of the key in the Keyed Stream.
+ * @param  The type of the key in the Keyed Stream.
--- End diff --

Revert the renaming from `KEY` to `K`.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171235653
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 ---
@@ -132,15 +139,15 @@ public TimerService timerService() {
}
}
 
-   private class OnTimerContextImpl extends ProcessFunction.OnTimerContext{
+   private class OnTimerContextImpl extends KeyedProcessFunction.OnTimerContext {
 
--- End diff --

With the proposed changes you can also remove the `` in the 
`.OnTimerContext`.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171234411
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -321,19 +324,80 @@ private boolean 
validateKeyTypeIsHashable(TypeInformation type) {
 * @param  The type of elements emitted by the {@code 
ProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
+*
+* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, 
TypeInformation)}
 */
+   @Deprecated
@Override
@Internal
public  SingleOutputStreamOperator process(
ProcessFunction processFunction,
TypeInformation outputType) {
 
-   KeyedProcessOperator operator =
-   new 
KeyedProcessOperator<>(clean(processFunction));
+   LegacyKeyedProcessOperator operator = new 
LegacyKeyedProcessOperator<>(clean(processFunction));
 
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+* function, this function can also query the time and set timers. When 
reacting to the firing
+* of set timers the function can directly emit elements and/or 
register yet more timers.
+*
+* @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {
+
+   TypeInformation outType = 
TypeExtractor.getUnaryOperatorReturnType(
+   keyedProcessFunction,
+   KeyedProcessFunction.class,
+   0,
--- End diff --

The indices here are not `0` and `1` for input and output type, but `1` and 
`2`. In the process function it was 0 and 1 because we did not have the key.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171235118
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A keyed function that processes elements of a stream.
+ *
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
+ *
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code KeyedProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code KeyedProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} is always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param  Type of the key.
+ * @param  Type of the input elements.
+ * @param  Type of the output elements.
+ */
+@PublicEvolving
+public abstract class KeyedProcessFunction extends 
AbstractRichFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* Process one element from the input stream.
+*
+* This function can output zero or more elements using the {@link 
Collector} parameter
+* and also update internal state or set timers using the {@link 
Context} parameter.
+*
+* @param value The input value.
+* @param ctx A {@link Context} that allows querying the timestamp of 
the element and getting
+*a {@link TimerService} for registering timers and 
querying the time. The
+*context is only valid during the invocation of this 
method, do not store it.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   public abstract void processElement(I value, Context ctx, Collector 
out) throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param ctx An {@link OnTimerContext} that allows querying the 
timestamp, the {@link TimeDomain}, and the key
+*of the firing timer and getting a {@link TimerService} 
for registering timers and querying the time.
+*The context is only valid during the invocation of this 
method, do not store it.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {}
+
+   /**
+* Inf

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571398
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
 asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
-  
+
+  /**
+* Applies the given [[KeyedProcessFunction]] on the input stream, 
thereby
--- End diff --

ditto


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169573342
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
@@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() {
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param keyedProcessFunction The {@link ProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {
--- End diff --

Does it make sense to add `process` with a `KeyedProcessFunction` on non 
keyed `DataStream`?


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571436
  
--- Diff: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 ---
@@ -448,11 +447,35 @@ class DataStreamTest extends AbstractTestBase {
 val flatMapped = src.keyBy(x => x).process(processFunction)
 
 assert(processFunction == getFunctionForDataStream(flatMapped))
+
assert(getOperatorForDataStream(flatMapped).isInstanceOf[LegacyKeyedProcessOperator[_,
 _, _]])
+  }
+
+  /**
+* Verify that a [[KeyedStream.process(KeyedProcessFunction)]] call is 
correctly
--- End diff --

ditto


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571457
  
--- Diff: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 ---
@@ -473,6 +496,28 @@ class DataStreamTest extends AbstractTestBase {
 
assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]])
   }
 
+  /**
+* Verify that a [[DataStream.process(KeyedProcessFunction)]] call is 
correctly
--- End diff --

ditto


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571197
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -686,6 +686,27 @@ class DataStream[T](stream: JavaStream[T]) {
 asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
 
+  /**
+* Applies the given [[KeyedProcessFunction]] on the input stream, 
thereby
--- End diff --

ditto


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571208
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -666,18 +666,18 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
-   * Applies the given [[ProcessFunction]] on the input stream, thereby
-   * creating a transformed output stream.
-   *
-   * The function will be called for every element in the stream and can 
produce
-   * zero or more output.
-   *
-   * @param processFunction The [[ProcessFunction]] that is called for 
each element
-   *   in the stream.
-   */
+* Applies the given [[ProcessFunction]] on the input stream, thereby
--- End diff --

Can you revert this formatting? I think proper java docs should be as this 
was before:
```
/**
 *
 */
```
instead of:
```
/**
  *
  */
```


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-19 Thread juergenthomann
Github user juergenthomann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169077241
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
@@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() {
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param keyedProcessFunction The {@link ProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
PKeyedProcessFunction}.
--- End diff --

Could this be a typo with **P**KeyedProcessFunction instead of 
KeyedProcessFunction?


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169060125
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -377,119 +375,114 @@ public T getKey(T value) throws Exception {
}
}
 
-   private static class QueryingFlatMapFunction extends 
ProcessFunction {
+   private static class QueryingFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
 
public QueryingFlatMapFunction(TimeDomain timeDomain) {
-   this.timeDomain = timeDomain;
+   this.expectedTimeDomain = timeDomain;
}
 
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
-   if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+   if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
} else {
out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
}
}
 
@Override
-   public void onTimer(
-   long timestamp,
-   OnTimerContext ctx,
-   Collector out) throws Exception {
+   public void onTimer(long timestamp, OnTimerContext 
ctx, Collector out) throws Exception {
+   // Do nothing
}
}
 
-   private static class TriggeringFlatMapFunction extends 
ProcessFunction {
+   private static class TriggeringFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
+
+   static final Integer EXPECTED_KEY = 17;
 
public TriggeringFlatMapFunction(TimeDomain timeDomain) {
-   this.timeDomain = timeDomain;
+   this.expectedTimeDomain = timeDomain;
}
 
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
out.collect(value);
-   if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+   if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {

ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
} else {

ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
}
}
 
@Override
-   public void onTimer(
-   long timestamp,
-   OnTimerContext ctx,
-   Collector out) throws Exception {
-
-   assertEquals(this.timeDomain, ctx.timeDomain());
+   public void onTimer(long timestamp, OnTimerContext 
ctx, Collector out) throws Exception {
+   assertEquals(EXPECTED_KEY, ctx.getCurrentKey());
+   assertEquals(expectedTimeDomain, ctx.timeDomain());
out.collect(1777);
}
}
 
-   private static class TriggeringStatefulFlatMapFunction extends 
ProcessFunction {
+   private static class TriggeringStatefulFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
private final ValueStateDescriptor state =
new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE);
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
 
public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) 
{
-   this.timeDomain = timeDomain;
+   this.expectedTimeDomain = timeDomain;
}
 
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
out.collect("INPUT:" + value);
getRuntimeContext().getState(state).update(value);
-   if (timeDomain.eq

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169060097
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -377,119 +375,114 @@ public T getKey(T value) throws Exception {
}
}
 
-   private static class QueryingFlatMapFunction extends 
ProcessFunction {
+   private static class QueryingFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
 
public QueryingFlatMapFunction(TimeDomain timeDomain) {
-   this.timeDomain = timeDomain;
+   this.expectedTimeDomain = timeDomain;
}
 
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
-   if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+   if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
} else {
out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
}
}
 
@Override
-   public void onTimer(
-   long timestamp,
-   OnTimerContext ctx,
-   Collector out) throws Exception {
+   public void onTimer(long timestamp, OnTimerContext 
ctx, Collector out) throws Exception {
+   // Do nothing
}
}
 
-   private static class TriggeringFlatMapFunction extends 
ProcessFunction {
+   private static class TriggeringFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
+
+   static final Integer EXPECTED_KEY = 17;
--- End diff --

As in the second PR: does this have to be a static field? Or can we 
initialise it in the constructor? I think it should work with the `expectedKey` 
set in the constructor as long as this is not an `ITCase` - and it's not, it's 
using test harness. 


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-14 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r168265503
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -162,7 +162,7 @@ class GroupAggProcessFunction(
 
   override def onTimer(
   timestamp: Long,
-  ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+  ctx: ProcessFunction[CRow, CRow]#OnTimerContext[_],
--- End diff --

yes, I think I'll create a `KeyedProcessFunction` as Aljoscha suggested, 
and we probably don't need these scala changes any more.


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-14 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r168122242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -162,7 +162,7 @@ class GroupAggProcessFunction(
 
   override def onTimer(
   timestamp: Long,
-  ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+  ctx: ProcessFunction[CRow, CRow]#OnTimerContext[_],
--- End diff --

I'm not a scala expert, but is this change somehow related to adding the 
`getCurrentKey()` method? 


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-14 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r168121129
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -397,17 +396,16 @@ public void processElement(Integer value, Context 
ctx, Collector out) th
}
 
@Override
-   public void onTimer(
-   long timestamp,
-   OnTimerContext ctx,
-   Collector out) throws Exception {
+   public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {
}
}
 
private static class TriggeringFlatMapFunction extends 
ProcessFunction {
 
private static final long serialVersionUID = 1L;
 
+   static final int TEST_VALUE = 17;
+
private final TimeDomain timeDomain;
 
public TriggeringFlatMapFunction(TimeDomain timeDomain) {
--- End diff --

rename `timeDomain` -> `expectedTimeDomain` and add `expectedKey` and add 
assertion for the expected key to `onTimer()` method triggered both in 
`KeyedProcessOperatorTest#testEventTimeTimers` and 
`KeyedProcessOperatorTest#testProcessingTimeTimers`


---


[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-13 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5481

[FLINK-8560] Access to the current key in ProcessFunction after keyBy()

## What is the purpose of the change

Currently, it is required to store the key of a keyBy() in the 
processElement method to have access to it in the OnTimerContext.

This is not so good as you have to check in the processElement method for 
every element if the key is already stored and set it if it's not already set.

A possible solution would adding OnTimerContext#getCurrentKey() or a 
similar method. Maybe having it in the open() method could maybe work as well.

## Brief change log

added `OnTimerContext#getCurrentKey()`

One limitation is that this impl of `getCurrentKey()` currently is not 
strongly typed. Declaring the key's type requires adding a new generic type to 
`ProcessFunction` - making the declaration from `ProcessFunction` to 
`ProcessFunction`. I'm worried it may break user's application, so 
I decide to make `getCurrentKey()` return an object. I'd like to discuss the 
feasibility of having strong type.

## Verifying this change

This change is already covered by existing tests, such as 
*KeyedProcessOperatorTest*.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8560

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5481


commit c5b8a4f27094b88c8641e2bdd30ea0ca65a7a4be
Author: Bowen Li 
Date:   2018-02-13T06:33:06Z

[FLINK-8560] Access to the current key in ProcessFunction after keyBy()




---