[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617201#comment-16617201
 ] 

ASF GitHub Bot commented on FLINK-10327:
----------------------------------------

pnowojski closed pull request #6687: [FLINK-10327][streaming] Expose 
processWatermarks notifications to (Co)ProcessFunction
URL: https://github.com/apache/flink/pull/6687
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
index c2c130ef58d..faf8fc7943a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -84,6 +85,15 @@
         */
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> 
out) throws Exception {}
 
+       /**
+        * Called when watermark has advanced.
+        *
+        * @param mark The {@link Watermark} that triggered this call
+        * @param out The collector to emit resulting elements to
+        */
+       public void processWatermark(Watermark mark, Collector<O> out) throws 
Exception {
+       }
+
        /**
         * Information available in an invocation of {@link 
#processElement(Object, Context, Collector)}
         * or {@link #onTimer(long, OnTimerContext, Collector)}.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index 20c10840c2c..39a9a7d2cd5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -98,6 +99,39 @@
         */
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> 
out) throws Exception {}
 
+       /**
+        * Called when combined watermark of both inputs has advanced.
+        *
+        * @param mark The {@link Watermark} that triggered this call
+        * @param out The collector to emit resulting elements to
+        */
+       public void processWatermark(Watermark mark, Collector<OUT> out) throws 
Exception {
+       }
+
+       /**
+        * Called when watermark of the first input has advanced. If this 
update will trigger an update
+        * of the combined watermark, this call will be followed by {@link 
#processWatermark(Watermark, Collector)}
+        * call.
+        *
+        * @param mark The {@link Watermark} that triggered this call
+        * @param out The collector to emit resulting elements to. Results 
emitted will have a timestamp
+        *            set to the value before advancing combined watermark.
+        */
+       public void processWatermark1(Watermark mark, Collector<OUT> out) 
throws Exception {
+       }
+
+       /**
+        * Called when watermark of the second input has advanced. If this 
update will trigger an update
+        * of the combined watermark, this call will be followed by {@link 
#processWatermark(Watermark, Collector)}
+        * call.
+        *
+        * @param mark The {@link Watermark} that triggered this call
+        * @param out The collector to emit resulting elements to. Results 
emitted will have a timestamp
+        *            set to the value before advancing combined watermark.
+        */
+       public void processWatermark2(Watermark mark, Collector<OUT> out) 
throws Exception {
+       }
+
        /**
         * Information available in an invocation of {@link 
#processElement1(Object, Context, Collector)}/
         * {@link #processElement2(Object, Context, Collector)}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 25c93a60b3d..bd18c40378b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -69,6 +69,8 @@ public void processElement(StreamRecord<IN> element) throws 
Exception {
 
        @Override
        public void processWatermark(Watermark mark) throws Exception {
+               collector.setAbsoluteTimestamp(mark.getTimestamp());
+               userFunction.processWatermark(mark, collector);
                super.processWatermark(mark);
                this.currentWatermark = mark.getTimestamp();
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index f6f2846bc25..fda22856a67 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -78,8 +78,25 @@ public void processElement2(StreamRecord<IN2> element) 
throws Exception {
                context.element = null;
        }
 
+       @Override
+       public void processWatermark1(Watermark mark) throws Exception {
+               collector.setAbsoluteTimestamp(currentWatermark);
+               userFunction.processWatermark1(mark, collector);
+               super.processWatermark1(mark);
+       }
+
+       @Override
+       public void processWatermark2(Watermark mark) throws Exception {
+               collector.setAbsoluteTimestamp(currentWatermark);
+               userFunction.processWatermark2(mark, collector);
+               super.processWatermark2(mark);
+       }
+
        @Override
        public void processWatermark(Watermark mark) throws Exception {
+               collector.setAbsoluteTimestamp(mark.getTimestamp());
+               userFunction.processWatermark(mark, collector);
+
                super.processWatermark(mark);
                currentWatermark = mark.getTimestamp();
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index 2d9d4d39496..051c900bc78 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -42,6 +42,28 @@
        @Rule
        public ExpectedException expectedException = ExpectedException.none();
 
+       @Test
+       public void testProcessWatermark() throws Exception {
+               try (OneInputStreamOperatorTestHarness<Integer, String> 
testHarness =
+                                new OneInputStreamOperatorTestHarness<>(new 
ProcessOperator<>(new OutputProcessWatermarkArguments()))) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+                       testHarness.processWatermark(new Watermark(42));
+                       expectedOutput.add(new StreamRecord<>("W_42", 42L));
+                       expectedOutput.add(new Watermark(42L));
+
+                       testHarness.processWatermark(new Watermark(44));
+                       expectedOutput.add(new StreamRecord<>("W_44", 44L));
+                       expectedOutput.add(new Watermark(44L));
+
+                       TestHarnessUtil.assertOutputEquals("Output was not 
correct.", expectedOutput, testHarness.getOutput());
+               }
+       }
+
        @Test
        public void testTimestampAndWatermarkQuerying() throws Exception {
 
@@ -209,4 +231,15 @@ public void onTimer(
                                Collector<String> out) throws Exception {
                }
        }
+
+       private static class OutputProcessWatermarkArguments<IN> extends 
ProcessFunction<IN, String> {
+               @Override
+               public void processElement(IN value, Context ctx, 
Collector<String> out) throws Exception {
+               }
+
+               @Override
+               public void processWatermark(Watermark mark, Collector<String> 
out) throws Exception {
+                       out.collect("W_" + mark.getTimestamp());
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index beb5bf554c0..47e827ec0ff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -35,6 +35,29 @@
  */
 public class CoProcessOperatorTest extends TestLogger {
 
+       @Test
+       public void testProcessWatermark() throws Exception {
+
+               try (TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
+                       new TwoInputStreamOperatorTestHarness<>(new 
CoProcessOperator<>(new OutputProcessWatermarkArguments()))) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+                       testHarness.processWatermark1(new Watermark(42));
+                       expectedOutput.add(new StreamRecord<>("W1_42", 
Long.MIN_VALUE));
+
+                       testHarness.processWatermark2(new Watermark(44));
+                       expectedOutput.add(new StreamRecord<>("W2_44", 
Long.MIN_VALUE));
+                       expectedOutput.add(new StreamRecord<>("W_42", 42L));
+                       expectedOutput.add(new Watermark(42L));
+
+                       TestHarnessUtil.assertOutputEquals("Output was not 
correct.", expectedOutput, testHarness.getOutput());
+               }
+       }
+
        @Test
        public void testTimestampAndWatermarkQuerying() throws Exception {
 
@@ -138,4 +161,29 @@ public void onTimer(
                                Collector<String> out) throws Exception {
                }
        }
+
+       private static class OutputProcessWatermarkArguments<IN1, IN2> extends 
CoProcessFunction<IN1, IN2, String> {
+               @Override
+               public void processElement1(IN1 value, Context ctx, 
Collector<String> out) throws Exception {
+               }
+
+               @Override
+               public void processElement2(IN2 value, Context ctx, 
Collector<String> out) throws Exception {
+               }
+
+               @Override
+               public void processWatermark(Watermark mark, Collector<String> 
out) {
+                       out.collect("W_" + mark.getTimestamp());
+               }
+
+               @Override
+               public void processWatermark1(Watermark mark, Collector<String> 
out) {
+                       out.collect("W1_" + mark.getTimestamp());
+               }
+
+               @Override
+               public void processWatermark2(Watermark mark, Collector<String> 
out) {
+                       out.collect("W2_" + mark.getTimestamp());
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-10327
>                 URL: https://issues.apache.org/jira/browse/FLINK-10327
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to