This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 545a0053ff2f0c467d405c3387e1a22b8723f6a4
Author: Weijie Guo <res...@163.com>
AuthorDate: Tue Apr 9 17:56:11 2024 +0800

    [FLINK-34549][API] Implement ProcessingTimeManager and expose it via 
PartitionedContext
---
 .../datastream/api/context/PartitionedContext.java |  3 +
 .../function/OneInputStreamProcessFunction.java    |  9 +++
 .../TwoInputBroadcastStreamProcessFunction.java    |  9 +++
 .../TwoInputNonBroadcastStreamProcessFunction.java |  9 +++
 .../function/TwoOutputStreamProcessFunction.java   | 14 ++++
 .../impl/context/DefaultPartitionedContext.java    | 12 +++-
 .../impl/context/DefaultProcessingTimeManager.java | 47 ++++++++++++++
 .../context/UnsupportedProcessingTimeManager.java  | 50 +++++++++++++++
 .../impl/operators/KeyedProcessOperator.java       | 42 +++++++++++-
 .../KeyedTwoInputBroadcastProcessOperator.java     | 42 +++++++++++-
 .../KeyedTwoInputNonBroadcastProcessOperator.java  | 42 +++++++++++-
 .../operators/KeyedTwoOutputProcessOperator.java   | 42 +++++++++++-
 .../datastream/impl/operators/ProcessOperator.java |  9 ++-
 .../TwoInputBroadcastProcessOperator.java          |  9 ++-
 .../TwoInputNonBroadcastProcessOperator.java       |  9 ++-
 .../impl/operators/TwoOutputProcessOperator.java   |  9 ++-
 .../context/DefaultProcessingTimeManagerTest.java  | 74 ++++++++++++++++++++++
 17 files changed, 422 insertions(+), 9 deletions(-)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/PartitionedContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/PartitionedContext.java
index e1b59a4a371..bb8ae9cf3d4 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/PartitionedContext.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/PartitionedContext.java
@@ -28,4 +28,7 @@ import org.apache.flink.annotation.Experimental;
 public interface PartitionedContext extends RuntimeContext {
     /** Get the {@link StateManager} of this process function. */
     StateManager getStateManager();
+
+    /** Get the {@link ProcessingTimeManager} of this process function. */
+    ProcessingTimeManager getProcessingTimeManager();
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java
index 3a6e7adcbf7..342de0a6ed0 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java
@@ -42,4 +42,13 @@ public interface OneInputStreamProcessFunction<IN, OUT> 
extends ProcessFunction
      * @param ctx the context in which this function is executed.
      */
     default void endInput(NonPartitionedContext<OUT> ctx) {}
+
+    /**
+     * Callback for processing timer.
+     *
+     * @param timestamp when this callback is triggered.
+     * @param output to emit record.
+     * @param ctx runtime context in which this function is executed.
+     */
+    default void onProcessingTimer(long timestamp, Collector<OUT> output, 
PartitionedContext ctx) {}
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java
index f567d8f5358..32d7168c59c 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java
@@ -65,4 +65,13 @@ public interface TwoInputBroadcastStreamProcessFunction<IN1, 
IN2, OUT> extends P
      * @param ctx the context in which this function is executed.
      */
     default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
+
+    /**
+     * Callback for processing timer.
+     *
+     * @param timestamp when this callback is triggered.
+     * @param output to emit record.
+     * @param ctx runtime context in which this function is executed.
+     */
+    default void onProcessingTimer(long timestamp, Collector<OUT> output, 
PartitionedContext ctx) {}
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java
index f90b3477f31..6fe67a28f2f 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java
@@ -61,4 +61,13 @@ public interface 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extend
      * @param ctx the context in which this function is executed.
      */
     default void endSecondInput(NonPartitionedContext<OUT> ctx) {}
+
+    /**
+     * Callback for processing timer.
+     *
+     * @param timestamp when this callback is triggered.
+     * @param output to emit record.
+     * @param ctx runtime context in which this function is executed.
+     */
+    default void onProcessingTimer(long timestamp, Collector<OUT> output, 
PartitionedContext ctx) {}
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java
index 18c3181c037..a9f76e70239 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java
@@ -45,4 +45,18 @@ public interface TwoOutputStreamProcessFunction<IN, OUT1, 
OUT2> extends ProcessF
      * @param ctx the context in which this function is executed.
      */
     default void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {}
+
+    /**
+     * Callback for processing timer.
+     *
+     * @param timestamp when this callback is triggered.
+     * @param output1 to emit record.
+     * @param output2 to emit record.
+     * @param ctx runtime context in which this function is executed.
+     */
+    default void onProcessingTimer(
+            long timestamp,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            PartitionedContext ctx) {}
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
index 4ba359e5e66..2d44a003c97 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.datastream.impl.context;
 
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.context.RuntimeContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 
@@ -32,12 +33,16 @@ public class DefaultPartitionedContext implements 
PartitionedContext {
 
     private final DefaultStateManager stateManager;
 
+    private final ProcessingTimeManager processingTimeManager;
+
     public DefaultPartitionedContext(
             RuntimeContext context,
             Supplier<Object> currentKeySupplier,
-            Consumer<Object> currentKeySetter) {
+            Consumer<Object> currentKeySetter,
+            ProcessingTimeManager processingTimeManager) {
         this.context = context;
         this.stateManager = new DefaultStateManager(currentKeySupplier, 
currentKeySetter);
+        this.processingTimeManager = processingTimeManager;
     }
 
     @Override
@@ -54,4 +59,9 @@ public class DefaultPartitionedContext implements 
PartitionedContext {
     public DefaultStateManager getStateManager() {
         return stateManager;
     }
+
+    @Override
+    public ProcessingTimeManager getProcessingTimeManager() {
+        return processingTimeManager;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultProcessingTimeManager.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultProcessingTimeManager.java
new file mode 100644
index 00000000000..8a9fa43e670
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultProcessingTimeManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+
+/** The default implementation of {@link ProcessingTimeManager}. */
+public class DefaultProcessingTimeManager implements ProcessingTimeManager {
+    private final InternalTimerService<VoidNamespace> timerService;
+
+    public DefaultProcessingTimeManager(InternalTimerService<VoidNamespace> 
timerService) {
+        this.timerService = timerService;
+    }
+
+    @Override
+    public void registerTimer(long timestamp) {
+        timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 
timestamp);
+    }
+
+    @Override
+    public void deleteTimer(long timestamp) {
+        timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, 
timestamp);
+    }
+
+    @Override
+    public long currentTime() {
+        return timerService.currentProcessingTime();
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/UnsupportedProcessingTimeManager.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/UnsupportedProcessingTimeManager.java
new file mode 100644
index 00000000000..2da2119b42f
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/UnsupportedProcessingTimeManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
+
+/**
+ * The special implementation of {@link ProcessingTimeManager}, all its 
methods are not supported.
+ * This is used for context that can not define the key.
+ */
+public class UnsupportedProcessingTimeManager implements ProcessingTimeManager 
{
+    public static final UnsupportedProcessingTimeManager INSTANCE =
+            new UnsupportedProcessingTimeManager();
+
+    private UnsupportedProcessingTimeManager() {}
+
+    @Override
+    public void registerTimer(long timestamp) {
+        throw new UnsupportedOperationException(
+                "Register processing timer is unsupported for non-keyed 
operator.");
+    }
+
+    @Override
+    public void deleteTimer(long timestamp) {
+        throw new UnsupportedOperationException(
+                "Delete processing timer is unsupported for non-keyed 
operator.");
+    }
+
+    @Override
+    public long currentTime() {
+        throw new UnsupportedOperationException(
+                "Get current processing time is unsupported for non-keyed 
operator.");
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
index fcd29667bcd..1b729f8c21f 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
@@ -19,16 +19,25 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
 import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
+import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
 
 import javax.annotation.Nullable;
 
 /** Operator for {@link OneInputStreamProcessFunction} in {@link 
KeyedPartitionStream}. */
-public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, 
OUT> {
+public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, 
OUT>
+        implements Triggerable<KEY, VoidNamespace> {
+    private transient InternalTimerService<VoidNamespace> timerService;
 
     @Nullable private final KeySelector<OUT, KEY> outKeySelector;
 
@@ -43,6 +52,13 @@ public class KeyedProcessOperator<KEY, IN, OUT> extends 
ProcessOperator<IN, OUT>
         this.outKeySelector = outKeySelector;
     }
 
+    @Override
+    public void open() throws Exception {
+        this.timerService =
+                getInternalTimerService("processing timer", 
VoidNamespaceSerializer.INSTANCE, this);
+        super.open();
+    }
+
     @Override
     protected TimestampCollector<OUT> getOutputCollector() {
         return outKeySelector != null
@@ -55,4 +71,28 @@ public class KeyedProcessOperator<KEY, IN, OUT> extends 
ProcessOperator<IN, OUT>
     protected Object currentKey() {
         return getCurrentKey();
     }
+
+    @Override
+    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws 
Exception {
+        // do nothing at the moment.
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) 
throws Exception {
+        // align the key context with the registered timer.
+        partitionedContext
+                .getStateManager()
+                .executeInKeyContext(
+                        () ->
+                                userFunction.onProcessingTimer(
+                                        timer.getTimestamp(),
+                                        getOutputCollector(),
+                                        partitionedContext),
+                        timer.getKey());
+    }
+
+    @Override
+    protected ProcessingTimeManager getProcessingTimeManager() {
+        return new DefaultProcessingTimeManager(timerService);
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
index f1852121ab0..d303d0cf186 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
@@ -19,17 +19,27 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
 import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
+import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
 
 import javax.annotation.Nullable;
 
 /** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link 
KeyedPartitionStream}. */
 public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT>
-        extends TwoInputBroadcastProcessOperator<IN1, IN2, OUT> {
+        extends TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
+        implements Triggerable<KEY, VoidNamespace> {
+    private transient InternalTimerService<VoidNamespace> timerService;
+
     @Nullable private final KeySelector<OUT, KEY> outKeySelector;
 
     public KeyedTwoInputBroadcastProcessOperator(
@@ -44,6 +54,13 @@ public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, 
IN2, OUT>
         this.outKeySelector = outKeySelector;
     }
 
+    @Override
+    public void open() throws Exception {
+        this.timerService =
+                getInternalTimerService("processing timer", 
VoidNamespaceSerializer.INSTANCE, this);
+        super.open();
+    }
+
     @Override
     protected TimestampCollector<OUT> getOutputCollector() {
         return outKeySelector == null
@@ -56,4 +73,27 @@ public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, 
IN2, OUT>
     protected Object currentKey() {
         return getCurrentKey();
     }
+
+    protected ProcessingTimeManager getProcessingTimeManager() {
+        return new DefaultProcessingTimeManager(timerService);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws 
Exception {
+        // do nothing at the moment.
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) 
throws Exception {
+        // align the key context with the registered timer.
+        partitionedContext
+                .getStateManager()
+                .executeInKeyContext(
+                        () ->
+                                userFunction.onProcessingTimer(
+                                        timer.getTimestamp(),
+                                        getOutputCollector(),
+                                        partitionedContext),
+                        timer.getKey());
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
index 2b636461f50..36ef9583599 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
@@ -19,11 +19,18 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
 import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
+import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
 
 import javax.annotation.Nullable;
 
@@ -31,7 +38,10 @@ import javax.annotation.Nullable;
  * Operator for {@link TwoInputNonBroadcastStreamProcessFunction} in {@link 
KeyedPartitionStream}.
  */
 public class KeyedTwoInputNonBroadcastProcessOperator<KEY, IN1, IN2, OUT>
-        extends TwoInputNonBroadcastProcessOperator<IN1, IN2, OUT> {
+        extends TwoInputNonBroadcastProcessOperator<IN1, IN2, OUT>
+        implements Triggerable<KEY, VoidNamespace> {
+    private transient InternalTimerService<VoidNamespace> timerService;
+
     @Nullable private final KeySelector<OUT, KEY> outKeySelector;
 
     public KeyedTwoInputNonBroadcastProcessOperator(
@@ -46,6 +56,13 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, 
IN1, IN2, OUT>
         this.outKeySelector = outKeySelector;
     }
 
+    @Override
+    public void open() throws Exception {
+        this.timerService =
+                getInternalTimerService("processing timer", 
VoidNamespaceSerializer.INSTANCE, this);
+        super.open();
+    }
+
     @Override
     protected TimestampCollector<OUT> getOutputCollector() {
         return outKeySelector == null
@@ -58,4 +75,27 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, 
IN1, IN2, OUT>
     protected Object currentKey() {
         return getCurrentKey();
     }
+
+    protected ProcessingTimeManager getProcessingTimeManager() {
+        return new DefaultProcessingTimeManager(timerService);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws 
Exception {
+        // do nothing at the moment.
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) 
throws Exception {
+        // align the key context with the registered timer.
+        partitionedContext
+                .getStateManager()
+                .executeInKeyContext(
+                        () ->
+                                userFunction.onProcessingTimer(
+                                        timer.getTimestamp(),
+                                        getOutputCollector(),
+                                        partitionedContext),
+                        timer.getKey());
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
index b4c4ec786fe..aa7de642bb1 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
@@ -19,10 +19,17 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
+import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
@@ -30,7 +37,9 @@ import javax.annotation.Nullable;
 
 /** */
 public class KeyedTwoOutputProcessOperator<KEY, IN, OUT_MAIN, OUT_SIDE>
-        extends TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE> {
+        extends TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE>
+        implements Triggerable<KEY, VoidNamespace> {
+    private transient InternalTimerService<VoidNamespace> timerService;
 
     @Nullable private final KeySelector<OUT_MAIN, KEY> mainOutKeySelector;
 
@@ -56,6 +65,13 @@ public class KeyedTwoOutputProcessOperator<KEY, IN, 
OUT_MAIN, OUT_SIDE>
         this.sideOutKeySelector = sideOutKeySelector;
     }
 
+    @Override
+    public void open() throws Exception {
+        this.timerService =
+                getInternalTimerService("processing timer", 
VoidNamespaceSerializer.INSTANCE, this);
+        super.open();
+    }
+
     @Override
     protected TimestampCollector<OUT_MAIN> getMainCollector() {
         return mainOutKeySelector != null && sideOutKeySelector != null
@@ -80,4 +96,28 @@ public class KeyedTwoOutputProcessOperator<KEY, IN, 
OUT_MAIN, OUT_SIDE>
     protected Object currentKey() {
         return getCurrentKey();
     }
+
+    protected ProcessingTimeManager getProcessingTimeManager() {
+        return new DefaultProcessingTimeManager(timerService);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws 
Exception {
+        // do nothing at the moment.
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) 
throws Exception {
+        // align the key context with the registered timer.
+        partitionedContext
+                .getStateManager()
+                .executeInKeyContext(
+                        () ->
+                                userFunction.onProcessingTimer(
+                                        timer.getTimestamp(),
+                                        getMainCollector(),
+                                        getSideCollector(),
+                                        partitionedContext),
+                        timer.getKey());
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index 50228806f3e..91f9821fea2 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -19,12 +19,14 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
+import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -64,7 +66,8 @@ public class ProcessOperator<IN, OUT>
                         taskInfo.getMaxNumberOfParallelSubtasks(),
                         taskInfo.getTaskName());
         partitionedContext =
-                new DefaultPartitionedContext(context, this::currentKey, 
this::setCurrentKey);
+                new DefaultPartitionedContext(
+                        context, this::currentKey, this::setCurrentKey, 
getProcessingTimeManager());
         nonPartitionedContext = new DefaultNonPartitionedContext<>(context);
         outputCollector = getOutputCollector();
     }
@@ -87,4 +90,8 @@ public class ProcessOperator<IN, OUT>
     protected Object currentKey() {
         throw new UnsupportedOperationException("The key is only defined for 
keyed operator");
     }
+
+    protected ProcessingTimeManager getProcessingTimeManager() {
+        return UnsupportedProcessingTimeManager.INSTANCE;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
index a31f4c624bf..b8cba8fd17d 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
@@ -19,12 +19,14 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
+import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -68,7 +70,8 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
                         taskInfo.getMaxNumberOfParallelSubtasks(),
                         taskInfo.getTaskName());
         this.partitionedContext =
-                new DefaultPartitionedContext(context, this::currentKey, 
this::setCurrentKey);
+                new DefaultPartitionedContext(
+                        context, this::currentKey, this::setCurrentKey, 
getProcessingTimeManager());
         this.nonPartitionedContext = new 
DefaultNonPartitionedContext<>(context);
     }
 
@@ -103,4 +106,8 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
     protected Object currentKey() {
         throw new UnsupportedOperationException("The key is only defined for 
keyed operator");
     }
+
+    protected ProcessingTimeManager getProcessingTimeManager() {
+        return UnsupportedProcessingTimeManager.INSTANCE;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
index 6d34c9daf72..19dd7859f40 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
@@ -19,12 +19,14 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
+import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -68,7 +70,8 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
                         taskInfo.getMaxNumberOfParallelSubtasks(),
                         taskInfo.getTaskName());
         this.partitionedContext =
-                new DefaultPartitionedContext(context, this::currentKey, 
this::setCurrentKey);
+                new DefaultPartitionedContext(
+                        context, this::currentKey, this::setCurrentKey, 
getProcessingTimeManager());
         this.nonPartitionedContext = new 
DefaultNonPartitionedContext<>(context);
     }
 
@@ -103,4 +106,8 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
     protected Object currentKey() {
         throw new UnsupportedOperationException("The key is only defined for 
keyed operator");
     }
+
+    protected ProcessingTimeManager getProcessingTimeManager() {
+        return UnsupportedProcessingTimeManager.INSTANCE;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
index 4356bbd78aa..d12fc79c957 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
 import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
@@ -26,6 +27,7 @@ import 
org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext;
+import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -79,7 +81,8 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE>
                         taskInfo.getMaxNumberOfParallelSubtasks(),
                         taskInfo.getTaskName());
         this.partitionedContext =
-                new DefaultPartitionedContext(context, this::currentKey, 
this::setCurrentKey);
+                new DefaultPartitionedContext(
+                        context, this::currentKey, this::setCurrentKey, 
getProcessingTimeManager());
         this.nonPartitionedContext = new 
DefaultTwoOutputNonPartitionedContext<>(context);
     }
 
@@ -108,6 +111,10 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, 
OUT_SIDE>
         throw new UnsupportedOperationException("The key is only defined for 
keyed operator");
     }
 
+    protected ProcessingTimeManager getProcessingTimeManager() {
+        return UnsupportedProcessingTimeManager.INSTANCE;
+    }
+
     /**
      * This is a special implementation of {@link TimestampCollector} that 
using side-output
      * mechanism to emit data.
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultProcessingTimeManagerTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultProcessingTimeManagerTest.java
new file mode 100644
index 00000000000..6eb95f9b4ee
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultProcessingTimeManagerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultProcessingTimeManager}. */
+class DefaultProcessingTimeManagerTest {
+    @Test
+    void testCurrentProcessingTime() throws Exception {
+        TestInternalTimerService<Integer, VoidNamespace> timerService = 
getTimerService();
+        DefaultProcessingTimeManager manager = new 
DefaultProcessingTimeManager(timerService);
+        long newTime = 100L;
+        timerService.advanceProcessingTime(newTime);
+        assertThat(manager.currentTime()).isEqualTo(newTime);
+    }
+
+    @Test
+    void testRegisterProcessingTimer() {
+        TestInternalTimerService<Integer, VoidNamespace> timerService = 
getTimerService();
+        DefaultProcessingTimeManager manager = new 
DefaultProcessingTimeManager(timerService);
+        assertThat(timerService.numProcessingTimeTimers()).isZero();
+        manager.registerTimer(100L);
+        assertThat(timerService.numProcessingTimeTimers()).isOne();
+    }
+
+    @Test
+    void testDeleteProcessingTimeTimer() {
+        TestInternalTimerService<Integer, VoidNamespace> timerService = 
getTimerService();
+        DefaultProcessingTimeManager manager = new 
DefaultProcessingTimeManager(timerService);
+        long time = 100L;
+        manager.registerTimer(time);
+        assertThat(timerService.numProcessingTimeTimers()).isOne();
+        manager.deleteTimer(time);
+        assertThat(timerService.numProcessingTimeTimers()).isZero();
+    }
+
+    @NotNull
+    private static TestInternalTimerService<Integer, VoidNamespace> 
getTimerService() {
+        return new TestInternalTimerService<>(
+                new KeyContext() {
+                    @Override
+                    public void setCurrentKey(Object key) {}
+
+                    @Override
+                    public Object getCurrentKey() {
+                        return "key";
+                    }
+                });
+    }
+}


Reply via email to