This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e3cd986cd4 [FLINK-30601][runtime] Omit "setKeyContextElement" call 
for non-keyed stream/operators to improve performance
4e3cd986cd4 is described below

commit 4e3cd986cd4304c699d5c7d368ffd6e0ead5a096
Author: Lijie Wang <wangdachui9...@gmail.com>
AuthorDate: Mon Jan 9 14:15:02 2023 +0800

    [FLINK-30601][runtime] Omit "setKeyContextElement" call for non-keyed 
stream/operators to improve performance
    
    This closes #21621
---
 .../state/api/output/BootstrapStreamTask.java      |  10 +-
 .../flink/state/api/output/BoundedStreamTask.java  |   8 +-
 .../streaming/api/operators/AbstractInput.java     |   7 +-
 .../api/operators/AbstractStreamOperator.java      |  13 +
 .../streaming/api/operators/KeyContextHandler.java |  67 ++++
 .../streaming/runtime/io/RecordProcessorUtils.java | 188 ++++++++++
 .../io/StreamMultipleInputProcessorFactory.java    |   8 +-
 .../runtime/io/StreamTwoInputProcessorFactory.java |  20 +-
 .../streaming/runtime/tasks/ChainingOutput.java    |   7 +-
 .../runtime/tasks/CopyingChainingOutput.java       |   3 +-
 .../runtime/tasks/OneInputStreamTask.java          |   7 +-
 .../runtime/io/RecordProcessorUtilsTest.java       | 407 +++++++++++++++++++++
 .../operators/multipleinput/input/InputBase.java   |  12 +-
 13 files changed, 723 insertions(+), 34 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java
index 8a5e832b28f..936fe88b846 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java
@@ -27,12 +27,14 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
+import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
@@ -53,6 +55,8 @@ class BootstrapStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> &
 
     private final Output<StreamRecord<OUT>> output;
 
+    private ThrowingConsumer<StreamRecord<IN>, Exception> recordProcessor;
+
     BootstrapStreamTask(
             Environment environment,
             BlockingQueue<StreamElement> input,
@@ -82,16 +86,14 @@ class BootstrapStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> &
         mainOperator = mainOperatorAndTimeService.f0;
         mainOperator.initializeState(createStreamTaskStateInitializer());
         mainOperator.open();
+        recordProcessor = 
RecordProcessorUtils.getRecordProcessor(mainOperator);
     }
 
     @Override
     protected void processInput(MailboxDefaultAction.Controller controller) 
throws Exception {
         StreamElement element = input.take();
         if (element.isRecord()) {
-            StreamRecord<IN> streamRecord = element.asRecord();
-
-            mainOperator.setKeyContextElement1(streamRecord);
-            mainOperator.processElement(streamRecord);
+            recordProcessor.accept(element.asRecord());
         } else {
             mainOperator.endInput();
             mainOperator.finish();
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index c8e6ac37f9e..c1b35bed4c5 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -37,6 +38,7 @@ import 
org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.Iterator;
 import java.util.Optional;
@@ -60,6 +62,8 @@ class BoundedStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> & Bo
 
     private final Timestamper<IN> timestamper;
 
+    private ThrowingConsumer<StreamRecord<IN>, Exception> recordProcessor;
+
     BoundedStreamTask(
             Environment environment,
             Iterable<IN> input,
@@ -91,6 +95,7 @@ class BoundedStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> & Bo
         mainOperator = mainOperatorAndTimeService.f0;
         mainOperator.initializeState(createStreamTaskStateInitializer());
         mainOperator.open();
+        recordProcessor = 
RecordProcessorUtils.getRecordProcessor(mainOperator);
     }
 
     @Override
@@ -103,8 +108,7 @@ class BoundedStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> & Bo
                 streamRecord.setTimestamp(timestamp);
             }
 
-            mainOperator.setKeyContextElement1(streamRecord);
-            mainOperator.processElement(streamRecord);
+            recordProcessor.accept(streamRecord);
         } else {
             mainOperator.endInput();
             mainOperator.finish();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
index 8cbb28bb67d..f8d0a74fa4d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
@@ -34,7 +34,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * AbstractStreamOperatorV2}.
  */
 @Experimental
-public abstract class AbstractInput<IN, OUT> implements Input<IN> {
+public abstract class AbstractInput<IN, OUT> implements Input<IN>, 
KeyContextHandler {
     /**
      * {@code KeySelector} for extracting a key from an element being 
processed. This is used to
      * scope keyed state to a key. This is null if the operator is not a keyed 
operator.
@@ -75,4 +75,9 @@ public abstract class AbstractInput<IN, OUT> implements 
Input<IN> {
     public void setKeyContextElement(StreamRecord record) throws Exception {
         owner.internalSetKeyContextElement(record, stateKeySelector);
     }
+
+    @Override
+    public boolean hasKeyContext() {
+        return stateKeySelector != null;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index d2a24dee4c7..78fb35af4e0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -89,6 +89,7 @@ public abstract class AbstractStreamOperator<OUT>
         implements StreamOperator<OUT>,
                 SetupableStreamOperator<OUT>,
                 CheckpointedStreamOperator,
+                KeyContextHandler,
                 Serializable {
     private static final long serialVersionUID = 1L;
 
@@ -483,6 +484,18 @@ public abstract class AbstractStreamOperator<OUT>
         setKeyContextElement(record, stateKeySelector2);
     }
 
+    @Internal
+    @Override
+    public boolean hasKeyContext1() {
+        return stateKeySelector1 != null;
+    }
+
+    @Internal
+    @Override
+    public boolean hasKeyContext2() {
+        return stateKeySelector2 != null;
+    }
+
     private <T> void setKeyContextElement(StreamRecord<T> record, 
KeySelector<T, ?> selector)
             throws Exception {
         if (selector != null) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java
new file mode 100644
index 00000000000..56d6a417b8e
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This interface is used to optimize the calls of {@link 
Input#setKeyContextElement}, {@link
+ * StreamOperator#setKeyContextElement1} and {@link 
StreamOperator#setKeyContextElement2}. We can
+ * decide(at the inputs/operators initialization) whether to omit the calls of
+ * "setKeyContextElement" according to the return value of {@link 
#hasKeyContext}. In this way, we
+ * can omit the calls of "setKeyContextElement" for inputs/operators that 
don't have "KeyContext".
+ *
+ * <p>All inputs/operators that want to optimize the "setKeyContextElement" 
calls should implement
+ * this interface.
+ */
+@Internal
+public interface KeyContextHandler {
+
+    /**
+     * Whether the {@link Input} has "KeyContext". If false, we can omit the 
call of {@link
+     * Input#setKeyContextElement} for each record.
+     *
+     * @return True if the {@link Input} has "KeyContext", false otherwise.
+     */
+    default boolean hasKeyContext() {
+        return hasKeyContext1();
+    }
+
+    /**
+     * Whether the first input of {@link StreamOperator} has "KeyContext". If 
false, we can omit the
+     * call of {@link StreamOperator#setKeyContextElement1} for each record 
arrived on the first
+     * input.
+     *
+     * @return True if the first input has "KeyContext", false otherwise.
+     */
+    default boolean hasKeyContext1() {
+        return true;
+    }
+
+    /**
+     * Whether the second input of {@link StreamOperator} has "KeyContext". If 
false, we can omit
+     * the call of {@link StreamOperator#setKeyContextElement1} for each 
record arrived on the
+     * second input.
+     *
+     * @return True if the second input has "KeyContext", false otherwise.
+     */
+    default boolean hasKeyContext2() {
+        return true;
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java
new file mode 100644
index 00000000000..f1636f3285d
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.KeyContextHandler;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/** Utility class for creating record processor for {@link Input} {@link 
StreamOperator}. */
+public class RecordProcessorUtils {
+
+    private static final String METHOD_SET_KEY_CONTEXT_ELEMENT = 
"setKeyContextElement";
+    private static final String METHOD_SET_KEY_CONTEXT_ELEMENT1 = 
"setKeyContextElement1";
+    private static final String METHOD_SET_KEY_CONTEXT_ELEMENT2 = 
"setKeyContextElement2";
+
+    /**
+     * Get record processor for {@link Input}, which will omit call of {@link
+     * Input#setKeyContextElement} if it doesn't have key context.
+     *
+     * @param input the {@link Input}
+     * @return the record processor
+     */
+    public static <T> ThrowingConsumer<StreamRecord<T>, Exception> 
getRecordProcessor(
+            Input<T> input) {
+        boolean canOmitSetKeyContext;
+        if (input instanceof AbstractStreamOperator) {
+            canOmitSetKeyContext = 
canOmitSetKeyContext((AbstractStreamOperator<?>) input, 0);
+        } else {
+            canOmitSetKeyContext =
+                    input instanceof KeyContextHandler
+                            && !((KeyContextHandler) input).hasKeyContext();
+        }
+
+        if (canOmitSetKeyContext) {
+            return input::processElement;
+        } else {
+            return record -> {
+                input.setKeyContextElement(record);
+                input.processElement(record);
+            };
+        }
+    }
+
+    /**
+     * Get record processor for the first input of {@link 
TwoInputStreamOperator}, which will omit
+     * call of {@link StreamOperator#setKeyContextElement1} if it doesn't have 
key context.
+     *
+     * @param operator the {@link TwoInputStreamOperator}
+     * @return the record processor
+     */
+    public static <T> ThrowingConsumer<StreamRecord<T>, Exception> 
getRecordProcessor1(
+            TwoInputStreamOperator<T, ?, ?> operator) {
+        boolean canOmitSetKeyContext;
+        if (operator instanceof AbstractStreamOperator) {
+            canOmitSetKeyContext = 
canOmitSetKeyContext((AbstractStreamOperator<?>) operator, 0);
+        } else {
+            canOmitSetKeyContext =
+                    operator instanceof KeyContextHandler
+                            && !((KeyContextHandler) 
operator).hasKeyContext1();
+        }
+
+        if (canOmitSetKeyContext) {
+            return operator::processElement1;
+        } else {
+            return record -> {
+                operator.setKeyContextElement1(record);
+                operator.processElement1(record);
+            };
+        }
+    }
+
+    /**
+     * Get record processor for the second input of {@link 
TwoInputStreamOperator}, which will omit
+     * call of {@link StreamOperator#setKeyContextElement2} if it doesn't have 
key context.
+     *
+     * @param operator the {@link TwoInputStreamOperator}
+     * @return the record processor
+     */
+    public static <T> ThrowingConsumer<StreamRecord<T>, Exception> 
getRecordProcessor2(
+            TwoInputStreamOperator<?, T, ?> operator) {
+        boolean canOmitSetKeyContext;
+        if (operator instanceof AbstractStreamOperator) {
+            canOmitSetKeyContext = 
canOmitSetKeyContext((AbstractStreamOperator<?>) operator, 1);
+        } else {
+            canOmitSetKeyContext =
+                    operator instanceof KeyContextHandler
+                            && !((KeyContextHandler) 
operator).hasKeyContext2();
+        }
+
+        if (canOmitSetKeyContext) {
+            return operator::processElement2;
+        } else {
+            return record -> {
+                operator.setKeyContextElement2(record);
+                operator.processElement2(record);
+            };
+        }
+    }
+
+    private static boolean canOmitSetKeyContext(
+            AbstractStreamOperator<?> streamOperator, int input) {
+        // Since AbstractStreamOperator is @PublicEvolving, we need to check 
whether the
+        // "SetKeyContextElement" is overridden by the (user-implemented) 
subclass. If it is
+        // overridden, we cannot omit it due to the subclass may maintain 
different key selectors on
+        // its own.
+        return !hasKeyContext(streamOperator, input)
+                && !methodSetKeyContextIsOverridden(streamOperator, input);
+    }
+
+    private static boolean hasKeyContext(AbstractStreamOperator<?> operator, 
int input) {
+        if (input == 0) {
+            return operator.hasKeyContext1();
+        } else {
+            return operator.hasKeyContext2();
+        }
+    }
+
+    private static boolean methodSetKeyContextIsOverridden(
+            AbstractStreamOperator<?> operator, int input) {
+        if (input == 0) {
+            if (operator instanceof OneInputStreamOperator) {
+                return methodIsOverridden(
+                                operator,
+                                OneInputStreamOperator.class,
+                                METHOD_SET_KEY_CONTEXT_ELEMENT,
+                                StreamRecord.class)
+                        || methodIsOverridden(
+                                operator,
+                                AbstractStreamOperator.class,
+                                METHOD_SET_KEY_CONTEXT_ELEMENT1,
+                                StreamRecord.class);
+            } else {
+                return methodIsOverridden(
+                        operator,
+                        AbstractStreamOperator.class,
+                        METHOD_SET_KEY_CONTEXT_ELEMENT1,
+                        StreamRecord.class);
+            }
+        } else {
+            return methodIsOverridden(
+                    operator,
+                    AbstractStreamOperator.class,
+                    METHOD_SET_KEY_CONTEXT_ELEMENT2,
+                    StreamRecord.class);
+        }
+    }
+
+    private static boolean methodIsOverridden(
+            AbstractStreamOperator<?> operator,
+            Class<?> expectedDeclaringClass,
+            String methodName,
+            Class<?>... parameterTypes) {
+        try {
+            Class<?> methodDeclaringClass =
+                    operator.getClass().getMethod(methodName, 
parameterTypes).getDeclaringClass();
+            return methodDeclaringClass != expectedDeclaringClass;
+        } catch (NoSuchMethodException exception) {
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "BUG: Can't find '%s' method in '%s'",
+                            methodName, operator.getClass()));
+        }
+    }
+
+    /** Private constructor to prevent instantiation. */
+    private RecordProcessorUtils() {}
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
index 9afa00e4cbd..f06e9e71fe8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.Arrays;
 import java.util.List;
@@ -248,6 +249,9 @@ public class StreamMultipleInputProcessorFactory {
 
         private final Counter networkRecordsIn;
 
+        /** The function way is only used for frequent record processing as 
for JIT optimization. */
+        private final ThrowingConsumer<StreamRecord<T>, Exception> 
recordConsumer;
+
         private StreamTaskNetworkOutput(
                 Input<T> input,
                 WatermarkGauge inputWatermarkGauge,
@@ -257,12 +261,12 @@ public class StreamMultipleInputProcessorFactory {
             this.inputWatermarkGauge = checkNotNull(inputWatermarkGauge);
             this.mainOperatorRecordsIn = mainOperatorRecordsIn;
             this.networkRecordsIn = networkRecordsIn;
+            this.recordConsumer = 
RecordProcessorUtils.getRecordProcessor(input);
         }
 
         @Override
         public void emitRecord(StreamRecord<T> record) throws Exception {
-            input.setKeyContextElement(record);
-            input.processElement(record);
+            recordConsumer.accept(record);
             mainOperatorRecordsIn.inc();
             networkRecordsIn.inc();
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index 8b4330a1223..ea296a3283c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -180,7 +180,7 @@ public class StreamTwoInputProcessorFactory {
         StreamTaskNetworkOutput<IN1> output1 =
                 new StreamTaskNetworkOutput<>(
                         streamOperator,
-                        record -> processRecord1(record, streamOperator),
+                        
RecordProcessorUtils.getRecordProcessor1(streamOperator),
                         input1WatermarkGauge,
                         0,
                         numRecordsIn,
@@ -191,7 +191,7 @@ public class StreamTwoInputProcessorFactory {
         StreamTaskNetworkOutput<IN2> output2 =
                 new StreamTaskNetworkOutput<>(
                         streamOperator,
-                        record -> processRecord2(record, streamOperator),
+                        
RecordProcessorUtils.getRecordProcessor2(streamOperator),
                         input2WatermarkGauge,
                         1,
                         numRecordsIn,
@@ -209,22 +209,6 @@ public class StreamTwoInputProcessorFactory {
         return (StreamTaskInput<IN1>) multiInput;
     }
 
-    private static <T> void processRecord1(
-            StreamRecord<T> record, TwoInputStreamOperator<T, ?, ?> 
streamOperator)
-            throws Exception {
-
-        streamOperator.setKeyContextElement1(record);
-        streamOperator.processElement1(record);
-    }
-
-    private static <T> void processRecord2(
-            StreamRecord<T> record, TwoInputStreamOperator<?, T, ?> 
streamOperator)
-            throws Exception {
-
-        streamOperator.setKeyContextElement2(record);
-        streamOperator.processElement2(record);
-    }
-
     /**
      * The network data output implementation used for processing stream 
elements from {@link
      * StreamTaskNetworkInput} in two input selective processor.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 0bf275b2cd7..0ff3785056f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -23,11 +23,13 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +45,7 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
     protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
     @Nullable protected final OutputTag<T> outputTag;
     protected WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE;
+    protected final ThrowingConsumer<StreamRecord<T>, Exception> 
recordProcessor;
 
     public ChainingOutput(
             Input<T> input,
@@ -59,6 +62,7 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
         }
         this.numRecordsIn = 
curOperatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
         this.outputTag = outputTag;
+        this.recordProcessor = RecordProcessorUtils.getRecordProcessor(input);
     }
 
     @Override
@@ -87,8 +91,7 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
 
             numRecordsOut.inc();
             numRecordsIn.inc();
-            input.setKeyContextElement(castRecord);
-            input.processElement(castRecord);
+            recordProcessor.accept(castRecord);
         } catch (Exception e) {
             throw new ExceptionInChainedOperatorException(e);
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
index c9091165ab7..c27d3a83490 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
@@ -72,8 +72,7 @@ final class CopyingChainingOutput<T> extends 
ChainingOutput<T> {
             numRecordsOut.inc();
             numRecordsIn.inc();
             StreamRecord<T> copy = 
castRecord.copy(serializer.copy(castRecord.getValue()));
-            input.setKeyContextElement(copy);
-            input.processElement(copy);
+            recordProcessor.accept(copy);
         } catch (ClassCastException e) {
             if (outputTag != null) {
                 // Enrich error message
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 1e445d05334..ddcb0182633 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.sort.SortingDataInput;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
 import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
 import org.apache.flink.streaming.runtime.io.StreamTaskInput;
 import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
@@ -44,6 +45,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.flink.shaded.curator5.com.google.common.collect.Iterables;
 
@@ -217,6 +219,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
 
         private final WatermarkGauge watermarkGauge;
         private final Counter numRecordsIn;
+        private final ThrowingConsumer<StreamRecord<IN>, Exception> 
recordProcessor;
 
         private StreamTaskNetworkOutput(
                 Input<IN> operator, WatermarkGauge watermarkGauge, Counter 
numRecordsIn) {
@@ -224,13 +227,13 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
             this.operator = checkNotNull(operator);
             this.watermarkGauge = checkNotNull(watermarkGauge);
             this.numRecordsIn = checkNotNull(numRecordsIn);
+            this.recordProcessor = 
RecordProcessorUtils.getRecordProcessor(operator);
         }
 
         @Override
         public void emitRecord(StreamRecord<IN> record) throws Exception {
             numRecordsIn.inc();
-            operator.setKeyContextElement(record);
-            operator.processElement(record);
+            recordProcessor.accept(record);
         }
 
         @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java
new file mode 100644
index 00000000000..69d9703e007
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.KeyContextHandler;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RecordProcessorUtils}. */
+class RecordProcessorUtilsTest {
+
+    @Test
+    void testGetRecordProcessor() throws Exception {
+        TestOperator input1 = new TestOperator();
+        TestOperator input2 = new TestKeyContextHandlerOperator(true);
+        TestOperator input3 = new TestKeyContextHandlerOperator(false);
+
+        RecordProcessorUtils.getRecordProcessor(input1).accept(new 
StreamRecord<>("test"));
+        assertThat(input1.setKeyContextElementCalled).isTrue();
+        assertThat(input1.processElementCalled).isTrue();
+
+        RecordProcessorUtils.getRecordProcessor(input2).accept(new 
StreamRecord<>("test"));
+        assertThat(input2.setKeyContextElementCalled).isTrue();
+        assertThat(input2.processElementCalled).isTrue();
+
+        RecordProcessorUtils.getRecordProcessor(input3).accept(new 
StreamRecord<>("test"));
+        assertThat(input3.setKeyContextElementCalled).isFalse();
+        assertThat(input3.processElementCalled).isTrue();
+    }
+
+    @Test
+    void testGetRecordProcessor1() throws Exception {
+        TestOperator operator1 = new TestOperator();
+        TestOperator operator2 = new TestKeyContextHandlerOperator(true, true);
+        TestOperator operator3 = new TestKeyContextHandlerOperator(false, 
true);
+
+        RecordProcessorUtils.getRecordProcessor1(operator1).accept(new 
StreamRecord<>("test"));
+        assertThat(operator1.setKeyContextElement1Called).isTrue();
+        assertThat(operator1.processElement1Called).isTrue();
+
+        RecordProcessorUtils.getRecordProcessor1(operator2).accept(new 
StreamRecord<>("test"));
+        assertThat(operator2.setKeyContextElement1Called).isTrue();
+        assertThat(operator2.processElement1Called).isTrue();
+
+        RecordProcessorUtils.getRecordProcessor1(operator3).accept(new 
StreamRecord<>("test"));
+        assertThat(operator3.setKeyContextElement1Called).isFalse();
+        assertThat(operator3.processElement1Called).isTrue();
+    }
+
+    @Test
+    void testGetRecordProcessor2() throws Exception {
+        TestOperator operator1 = new TestOperator();
+        TestOperator operator2 = new TestKeyContextHandlerOperator(true, true);
+        TestOperator operator3 = new TestKeyContextHandlerOperator(true, 
false);
+
+        RecordProcessorUtils.getRecordProcessor2(operator1).accept(new 
StreamRecord<>("test"));
+        assertThat(operator1.setKeyContextElement2Called).isTrue();
+        assertThat(operator1.processElement2Called).isTrue();
+
+        RecordProcessorUtils.getRecordProcessor2(operator2).accept(new 
StreamRecord<>("test"));
+        assertThat(operator2.setKeyContextElement2Called).isTrue();
+        assertThat(operator2.processElement2Called).isTrue();
+
+        RecordProcessorUtils.getRecordProcessor2(operator3).accept(new 
StreamRecord<>("test"));
+        assertThat(operator3.setKeyContextElement2Called).isFalse();
+        assertThat(operator3.processElement2Called).isTrue();
+    }
+
+    @Test
+    void testOverrideSetKeyContextElementForOneInputStreamOperator() throws 
Exception {
+        // test no override
+        NoOverrideOneInputStreamOperator noOverride = new 
NoOverrideOneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(noOverride).accept(new 
StreamRecord<>("test"));
+        assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+        // test override "SetKeyContextElement"
+        OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext =
+                new OverrideSetKeyContextOneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext)
+                .accept(new StreamRecord<>("test"));
+        assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue();
+
+        // test override "SetKeyContextElement1"
+        OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 =
+                new OverrideSetKeyContext1OneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext1)
+                .accept(new StreamRecord<>("test"));
+        
assertThat(overrideSetKeyContext1.setKeyContextElement1Called).isTrue();
+    }
+
+    @Test
+    void testOverrideSetKeyContextElementForTwoInputStreamOperator() throws 
Exception {
+        // test no override
+        NoOverrideTwoInputStreamOperator noOverride = new 
NoOverrideTwoInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor1(noOverride).accept(new 
StreamRecord<>("test"));
+        RecordProcessorUtils.getRecordProcessor2(noOverride).accept(new 
StreamRecord<>("test"));
+        assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+        // test override "SetKeyContextElement1" and "SetKeyContextElement2"
+        OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator override 
=
+                new 
OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor1(override).accept(new 
StreamRecord<>("test"));
+        RecordProcessorUtils.getRecordProcessor2(override).accept(new 
StreamRecord<>("test"));
+        assertThat(override.setKeyContextElement1Called).isTrue();
+        assertThat(override.setKeyContextElement2Called).isTrue();
+    }
+
+    private static class NoOverrideOperator extends 
AbstractStreamOperator<String> {
+
+        boolean setCurrentKeyCalled = false;
+
+        NoOverrideOperator() throws Exception {
+            super();
+            // For case that "SetKeyContextElement" has not been overridden,
+            // we can determine whether the "SetKeyContextElement" is called 
through
+            // "setCurrentKey". According to the implementation, we need to 
make the
+            // "stateKeySelector1/stateKeySelector2" not null. Besides, we 
override the
+            // "hasKeyContext1" and "hasKeyContext2" to avoid 
"stateKeySelector1/stateKeySelector2"
+            // from affecting the return value
+            Configuration configuration = new Configuration();
+            KeySelector keySelector = x -> x;
+            InstantiationUtil.writeObjectToConfig(keySelector, configuration, 
"statePartitioner0");
+            InstantiationUtil.writeObjectToConfig(keySelector, configuration, 
"statePartitioner1");
+            setup(
+                    new StreamTaskITCase.NoOpStreamTask<>(new 
DummyEnvironment()),
+                    new MockStreamConfig(configuration, 1),
+                    new MockOutput<>(new ArrayList<>()));
+        }
+
+        @Override
+        public boolean hasKeyContext1() {
+            return false;
+        }
+
+        @Override
+        public boolean hasKeyContext2() {
+            return false;
+        }
+
+        @Override
+        public void setCurrentKey(Object key) {
+            setCurrentKeyCalled = true;
+        }
+    }
+
+    private static class NoOverrideOneInputStreamOperator extends 
NoOverrideOperator
+            implements OneInputStreamOperator<String, String> {
+
+        NoOverrideOneInputStreamOperator() throws Exception {
+            super();
+        }
+
+        @Override
+        public void processElement(StreamRecord<String> element) throws 
Exception {}
+    }
+
+    private static class OverrideSetKeyContextOneInputStreamOperator
+            extends NoOverrideOneInputStreamOperator {
+        boolean setKeyContextElementCalled = false;
+
+        OverrideSetKeyContextOneInputStreamOperator() throws Exception {
+            super();
+        }
+
+        @Override
+        public void setKeyContextElement(StreamRecord<String> record) throws 
Exception {
+            setKeyContextElementCalled = true;
+        }
+    }
+
+    private static class OverrideSetKeyContext1OneInputStreamOperator
+            extends NoOverrideOneInputStreamOperator {
+        boolean setKeyContextElement1Called = false;
+
+        OverrideSetKeyContext1OneInputStreamOperator() throws Exception {
+            super();
+        }
+
+        @Override
+        public void setKeyContextElement1(StreamRecord record) throws 
Exception {
+            setKeyContextElement1Called = true;
+        }
+    }
+
+    private static class NoOverrideTwoInputStreamOperator extends 
NoOverrideOperator
+            implements TwoInputStreamOperator<String, String, String> {
+
+        NoOverrideTwoInputStreamOperator() throws Exception {
+            super();
+        }
+
+        @Override
+        public void processElement1(StreamRecord<String> element) throws 
Exception {}
+
+        @Override
+        public void processElement2(StreamRecord<String> element) throws 
Exception {}
+    }
+
+    private static class 
OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator
+            extends NoOverrideTwoInputStreamOperator {
+
+        boolean setKeyContextElement1Called = false;
+
+        boolean setKeyContextElement2Called = false;
+
+        OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator() throws 
Exception {
+            super();
+        }
+
+        @Override
+        public void setKeyContextElement1(StreamRecord record) throws 
Exception {
+            setKeyContextElement1Called = true;
+        }
+
+        @Override
+        public void setKeyContextElement2(StreamRecord record) throws 
Exception {
+            setKeyContextElement2Called = true;
+        }
+    }
+
+    private static class TestOperator
+            implements Input<String>, TwoInputStreamOperator<String, String, 
String> {
+        boolean setKeyContextElementCalled = false;
+        boolean processElementCalled = false;
+
+        boolean setKeyContextElement1Called = false;
+        boolean processElement1Called = false;
+
+        boolean setKeyContextElement2Called = false;
+        boolean processElement2Called = false;
+
+        @Override
+        public void processElement(StreamRecord<String> element) throws 
Exception {
+            processElementCalled = true;
+        }
+
+        @Override
+        public void processWatermark(Watermark mark) throws Exception {}
+
+        @Override
+        public void processWatermarkStatus(WatermarkStatus watermarkStatus) 
throws Exception {}
+
+        @Override
+        public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {}
+
+        @Override
+        public void setKeyContextElement(StreamRecord<String> record) throws 
Exception {
+            setKeyContextElementCalled = true;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {}
+
+        @Override
+        public void setCurrentKey(Object key) {}
+
+        @Override
+        public Object getCurrentKey() {
+            return null;
+        }
+
+        @Override
+        public void open() throws Exception {}
+
+        @Override
+        public void finish() throws Exception {}
+
+        @Override
+        public void close() throws Exception {}
+
+        @Override
+        public void prepareSnapshotPreBarrier(long checkpointId) throws 
Exception {}
+
+        @Override
+        public OperatorSnapshotFutures snapshotState(
+                long checkpointId,
+                long timestamp,
+                CheckpointOptions checkpointOptions,
+                CheckpointStreamFactory storageLocation)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
+                throws Exception {}
+
+        @Override
+        public OperatorMetricGroup getMetricGroup() {
+            return null;
+        }
+
+        @Override
+        public OperatorID getOperatorID() {
+            return null;
+        }
+
+        @Override
+        public void setKeyContextElement1(StreamRecord<?> record) throws 
Exception {
+            setKeyContextElement1Called = true;
+        }
+
+        @Override
+        public void setKeyContextElement2(StreamRecord<?> record) throws 
Exception {
+            setKeyContextElement2Called = true;
+        }
+
+        @Override
+        public void processElement1(StreamRecord<String> element) throws 
Exception {
+            processElement1Called = true;
+        }
+
+        @Override
+        public void processElement2(StreamRecord<String> element) throws 
Exception {
+            processElement2Called = true;
+        }
+
+        @Override
+        public void processWatermark1(Watermark mark) throws Exception {}
+
+        @Override
+        public void processWatermark2(Watermark mark) throws Exception {}
+
+        @Override
+        public void processLatencyMarker1(LatencyMarker latencyMarker) throws 
Exception {}
+
+        @Override
+        public void processLatencyMarker2(LatencyMarker latencyMarker) throws 
Exception {}
+
+        @Override
+        public void processWatermarkStatus1(WatermarkStatus watermarkStatus) 
throws Exception {}
+
+        @Override
+        public void processWatermarkStatus2(WatermarkStatus watermarkStatus) 
throws Exception {}
+    }
+
+    private static class TestKeyContextHandlerOperator extends TestOperator
+            implements KeyContextHandler {
+        private final boolean hasKeyContext1;
+        private final boolean hasKeyContext2;
+
+        TestKeyContextHandlerOperator(boolean hasKeyContext) {
+            this.hasKeyContext1 = hasKeyContext;
+            this.hasKeyContext2 = true;
+        }
+
+        TestKeyContextHandlerOperator(boolean hasKeyContext1, boolean 
hasKeyContext2) {
+            this.hasKeyContext1 = hasKeyContext1;
+            this.hasKeyContext2 = hasKeyContext2;
+        }
+
+        @Override
+        public boolean hasKeyContext() {
+            return hasKeyContext1;
+        }
+
+        @Override
+        public boolean hasKeyContext1() {
+            return hasKeyContext1;
+        }
+
+        @Override
+        public boolean hasKeyContext2() {
+            return hasKeyContext2;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java
index 2343441bebc..4d0a98ac4a6 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java
@@ -19,15 +19,25 @@
 package org.apache.flink.table.runtime.operators.multipleinput.input;
 
 import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.KeyContextHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import 
org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase;
 
 /** Base {@link Input} used in {@link MultipleInputStreamOperatorBase}. */
-public abstract class InputBase implements Input<RowData> {
+public abstract class InputBase implements Input<RowData>, KeyContextHandler {
 
     @Override
     public void setKeyContextElement(StreamRecord<RowData> record) throws 
Exception {
         // do nothing
     }
+
+    @Override
+    public boolean hasKeyContext() {
+        // Currently, we can simply return false due to 
InputBase#setKeyContextElement is an empty
+        // implementation. Once there is a non-empty implementation in the 
future, this method
+        // should also be adapted, otherwise the 
InputBase#setKeyContextElement will never be
+        // called.
+        return false;
+    }
 }

Reply via email to