This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4e3cd986cd4 [FLINK-30601][runtime] Omit "setKeyContextElement" call for non-keyed stream/operators to improve performance 4e3cd986cd4 is described below commit 4e3cd986cd4304c699d5c7d368ffd6e0ead5a096 Author: Lijie Wang <wangdachui9...@gmail.com> AuthorDate: Mon Jan 9 14:15:02 2023 +0800 [FLINK-30601][runtime] Omit "setKeyContextElement" call for non-keyed stream/operators to improve performance This closes #21621 --- .../state/api/output/BootstrapStreamTask.java | 10 +- .../flink/state/api/output/BoundedStreamTask.java | 8 +- .../streaming/api/operators/AbstractInput.java | 7 +- .../api/operators/AbstractStreamOperator.java | 13 + .../streaming/api/operators/KeyContextHandler.java | 67 ++++ .../streaming/runtime/io/RecordProcessorUtils.java | 188 ++++++++++ .../io/StreamMultipleInputProcessorFactory.java | 8 +- .../runtime/io/StreamTwoInputProcessorFactory.java | 20 +- .../streaming/runtime/tasks/ChainingOutput.java | 7 +- .../runtime/tasks/CopyingChainingOutput.java | 3 +- .../runtime/tasks/OneInputStreamTask.java | 7 +- .../runtime/io/RecordProcessorUtilsTest.java | 407 +++++++++++++++++++++ .../operators/multipleinput/input/InputBase.java | 12 +- 13 files changed, 723 insertions(+), 34 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java index 8a5e832b28f..936fe88b846 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java @@ -27,12 +27,14 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingConsumer; import java.util.Optional; import java.util.concurrent.BlockingQueue; @@ -53,6 +55,8 @@ class BootstrapStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & private final Output<StreamRecord<OUT>> output; + private ThrowingConsumer<StreamRecord<IN>, Exception> recordProcessor; + BootstrapStreamTask( Environment environment, BlockingQueue<StreamElement> input, @@ -82,16 +86,14 @@ class BootstrapStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & mainOperator = mainOperatorAndTimeService.f0; mainOperator.initializeState(createStreamTaskStateInitializer()); mainOperator.open(); + recordProcessor = RecordProcessorUtils.getRecordProcessor(mainOperator); } @Override protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { StreamElement element = input.take(); if (element.isRecord()) { - StreamRecord<IN> streamRecord = element.asRecord(); - - mainOperator.setKeyContextElement1(streamRecord); - mainOperator.processElement(streamRecord); + recordProcessor.accept(element.asRecord()); } else { mainOperator.endInput(); mainOperator.finish(); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java index c8e6ac37f9e..c1b35bed4c5 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -37,6 +38,7 @@ import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingConsumer; import java.util.Iterator; import java.util.Optional; @@ -60,6 +62,8 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo private final Timestamper<IN> timestamper; + private ThrowingConsumer<StreamRecord<IN>, Exception> recordProcessor; + BoundedStreamTask( Environment environment, Iterable<IN> input, @@ -91,6 +95,7 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo mainOperator = mainOperatorAndTimeService.f0; mainOperator.initializeState(createStreamTaskStateInitializer()); mainOperator.open(); + recordProcessor = RecordProcessorUtils.getRecordProcessor(mainOperator); } @Override @@ -103,8 +108,7 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo streamRecord.setTimestamp(timestamp); } - mainOperator.setKeyContextElement1(streamRecord); - mainOperator.processElement(streamRecord); + recordProcessor.accept(streamRecord); } else { mainOperator.endInput(); mainOperator.finish(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java index 8cbb28bb67d..f8d0a74fa4d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java @@ -34,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * AbstractStreamOperatorV2}. */ @Experimental -public abstract class AbstractInput<IN, OUT> implements Input<IN> { +public abstract class AbstractInput<IN, OUT> implements Input<IN>, KeyContextHandler { /** * {@code KeySelector} for extracting a key from an element being processed. This is used to * scope keyed state to a key. This is null if the operator is not a keyed operator. @@ -75,4 +75,9 @@ public abstract class AbstractInput<IN, OUT> implements Input<IN> { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public boolean hasKeyContext() { + return stateKeySelector != null; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index d2a24dee4c7..78fb35af4e0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -89,6 +89,7 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, CheckpointedStreamOperator, + KeyContextHandler, Serializable { private static final long serialVersionUID = 1L; @@ -483,6 +484,18 @@ public abstract class AbstractStreamOperator<OUT> setKeyContextElement(record, stateKeySelector2); } + @Internal + @Override + public boolean hasKeyContext1() { + return stateKeySelector1 != null; + } + + @Internal + @Override + public boolean hasKeyContext2() { + return stateKeySelector2 != null; + } + private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception { if (selector != null) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java new file mode 100644 index 00000000000..56d6a417b8e --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +/** + * This interface is used to optimize the calls of {@link Input#setKeyContextElement}, {@link + * StreamOperator#setKeyContextElement1} and {@link StreamOperator#setKeyContextElement2}. We can + * decide(at the inputs/operators initialization) whether to omit the calls of + * "setKeyContextElement" according to the return value of {@link #hasKeyContext}. In this way, we + * can omit the calls of "setKeyContextElement" for inputs/operators that don't have "KeyContext". + * + * <p>All inputs/operators that want to optimize the "setKeyContextElement" calls should implement + * this interface. + */ +@Internal +public interface KeyContextHandler { + + /** + * Whether the {@link Input} has "KeyContext". If false, we can omit the call of {@link + * Input#setKeyContextElement} for each record. + * + * @return True if the {@link Input} has "KeyContext", false otherwise. + */ + default boolean hasKeyContext() { + return hasKeyContext1(); + } + + /** + * Whether the first input of {@link StreamOperator} has "KeyContext". If false, we can omit the + * call of {@link StreamOperator#setKeyContextElement1} for each record arrived on the first + * input. + * + * @return True if the first input has "KeyContext", false otherwise. + */ + default boolean hasKeyContext1() { + return true; + } + + /** + * Whether the second input of {@link StreamOperator} has "KeyContext". If false, we can omit + * the call of {@link StreamOperator#setKeyContextElement1} for each record arrived on the + * second input. + * + * @return True if the second input has "KeyContext", false otherwise. + */ + default boolean hasKeyContext2() { + return true; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java new file mode 100644 index 00000000000..f1636f3285d --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.KeyContextHandler; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.ThrowingConsumer; + +/** Utility class for creating record processor for {@link Input} {@link StreamOperator}. */ +public class RecordProcessorUtils { + + private static final String METHOD_SET_KEY_CONTEXT_ELEMENT = "setKeyContextElement"; + private static final String METHOD_SET_KEY_CONTEXT_ELEMENT1 = "setKeyContextElement1"; + private static final String METHOD_SET_KEY_CONTEXT_ELEMENT2 = "setKeyContextElement2"; + + /** + * Get record processor for {@link Input}, which will omit call of {@link + * Input#setKeyContextElement} if it doesn't have key context. + * + * @param input the {@link Input} + * @return the record processor + */ + public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor( + Input<T> input) { + boolean canOmitSetKeyContext; + if (input instanceof AbstractStreamOperator) { + canOmitSetKeyContext = canOmitSetKeyContext((AbstractStreamOperator<?>) input, 0); + } else { + canOmitSetKeyContext = + input instanceof KeyContextHandler + && !((KeyContextHandler) input).hasKeyContext(); + } + + if (canOmitSetKeyContext) { + return input::processElement; + } else { + return record -> { + input.setKeyContextElement(record); + input.processElement(record); + }; + } + } + + /** + * Get record processor for the first input of {@link TwoInputStreamOperator}, which will omit + * call of {@link StreamOperator#setKeyContextElement1} if it doesn't have key context. + * + * @param operator the {@link TwoInputStreamOperator} + * @return the record processor + */ + public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor1( + TwoInputStreamOperator<T, ?, ?> operator) { + boolean canOmitSetKeyContext; + if (operator instanceof AbstractStreamOperator) { + canOmitSetKeyContext = canOmitSetKeyContext((AbstractStreamOperator<?>) operator, 0); + } else { + canOmitSetKeyContext = + operator instanceof KeyContextHandler + && !((KeyContextHandler) operator).hasKeyContext1(); + } + + if (canOmitSetKeyContext) { + return operator::processElement1; + } else { + return record -> { + operator.setKeyContextElement1(record); + operator.processElement1(record); + }; + } + } + + /** + * Get record processor for the second input of {@link TwoInputStreamOperator}, which will omit + * call of {@link StreamOperator#setKeyContextElement2} if it doesn't have key context. + * + * @param operator the {@link TwoInputStreamOperator} + * @return the record processor + */ + public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor2( + TwoInputStreamOperator<?, T, ?> operator) { + boolean canOmitSetKeyContext; + if (operator instanceof AbstractStreamOperator) { + canOmitSetKeyContext = canOmitSetKeyContext((AbstractStreamOperator<?>) operator, 1); + } else { + canOmitSetKeyContext = + operator instanceof KeyContextHandler + && !((KeyContextHandler) operator).hasKeyContext2(); + } + + if (canOmitSetKeyContext) { + return operator::processElement2; + } else { + return record -> { + operator.setKeyContextElement2(record); + operator.processElement2(record); + }; + } + } + + private static boolean canOmitSetKeyContext( + AbstractStreamOperator<?> streamOperator, int input) { + // Since AbstractStreamOperator is @PublicEvolving, we need to check whether the + // "SetKeyContextElement" is overridden by the (user-implemented) subclass. If it is + // overridden, we cannot omit it due to the subclass may maintain different key selectors on + // its own. + return !hasKeyContext(streamOperator, input) + && !methodSetKeyContextIsOverridden(streamOperator, input); + } + + private static boolean hasKeyContext(AbstractStreamOperator<?> operator, int input) { + if (input == 0) { + return operator.hasKeyContext1(); + } else { + return operator.hasKeyContext2(); + } + } + + private static boolean methodSetKeyContextIsOverridden( + AbstractStreamOperator<?> operator, int input) { + if (input == 0) { + if (operator instanceof OneInputStreamOperator) { + return methodIsOverridden( + operator, + OneInputStreamOperator.class, + METHOD_SET_KEY_CONTEXT_ELEMENT, + StreamRecord.class) + || methodIsOverridden( + operator, + AbstractStreamOperator.class, + METHOD_SET_KEY_CONTEXT_ELEMENT1, + StreamRecord.class); + } else { + return methodIsOverridden( + operator, + AbstractStreamOperator.class, + METHOD_SET_KEY_CONTEXT_ELEMENT1, + StreamRecord.class); + } + } else { + return methodIsOverridden( + operator, + AbstractStreamOperator.class, + METHOD_SET_KEY_CONTEXT_ELEMENT2, + StreamRecord.class); + } + } + + private static boolean methodIsOverridden( + AbstractStreamOperator<?> operator, + Class<?> expectedDeclaringClass, + String methodName, + Class<?>... parameterTypes) { + try { + Class<?> methodDeclaringClass = + operator.getClass().getMethod(methodName, parameterTypes).getDeclaringClass(); + return methodDeclaringClass != expectedDeclaringClass; + } catch (NoSuchMethodException exception) { + throw new FlinkRuntimeException( + String.format( + "BUG: Can't find '%s' method in '%s'", + methodName, operator.getClass())); + } + } + + /** Private constructor to prevent instantiation. */ + private RecordProcessorUtils() {} +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java index 9afa00e4cbd..f06e9e71fe8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java @@ -50,6 +50,7 @@ import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput; import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.function.ThrowingConsumer; import java.util.Arrays; import java.util.List; @@ -248,6 +249,9 @@ public class StreamMultipleInputProcessorFactory { private final Counter networkRecordsIn; + /** The function way is only used for frequent record processing as for JIT optimization. */ + private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer; + private StreamTaskNetworkOutput( Input<T> input, WatermarkGauge inputWatermarkGauge, @@ -257,12 +261,12 @@ public class StreamMultipleInputProcessorFactory { this.inputWatermarkGauge = checkNotNull(inputWatermarkGauge); this.mainOperatorRecordsIn = mainOperatorRecordsIn; this.networkRecordsIn = networkRecordsIn; + this.recordConsumer = RecordProcessorUtils.getRecordProcessor(input); } @Override public void emitRecord(StreamRecord<T> record) throws Exception { - input.setKeyContextElement(record); - input.processElement(record); + recordConsumer.accept(record); mainOperatorRecordsIn.inc(); networkRecordsIn.inc(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java index 8b4330a1223..ea296a3283c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java @@ -180,7 +180,7 @@ public class StreamTwoInputProcessorFactory { StreamTaskNetworkOutput<IN1> output1 = new StreamTaskNetworkOutput<>( streamOperator, - record -> processRecord1(record, streamOperator), + RecordProcessorUtils.getRecordProcessor1(streamOperator), input1WatermarkGauge, 0, numRecordsIn, @@ -191,7 +191,7 @@ public class StreamTwoInputProcessorFactory { StreamTaskNetworkOutput<IN2> output2 = new StreamTaskNetworkOutput<>( streamOperator, - record -> processRecord2(record, streamOperator), + RecordProcessorUtils.getRecordProcessor2(streamOperator), input2WatermarkGauge, 1, numRecordsIn, @@ -209,22 +209,6 @@ public class StreamTwoInputProcessorFactory { return (StreamTaskInput<IN1>) multiInput; } - private static <T> void processRecord1( - StreamRecord<T> record, TwoInputStreamOperator<T, ?, ?> streamOperator) - throws Exception { - - streamOperator.setKeyContextElement1(record); - streamOperator.processElement1(record); - } - - private static <T> void processRecord2( - StreamRecord<T> record, TwoInputStreamOperator<?, T, ?> streamOperator) - throws Exception { - - streamOperator.setKeyContextElement2(record); - streamOperator.processElement2(record); - } - /** * The network data output implementation used for processing stream elements from {@link * StreamTaskNetworkInput} in two input selective processor. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java index 0bf275b2cd7..0ff3785056f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java @@ -23,11 +23,13 @@ import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.OutputTag; +import org.apache.flink.util.function.ThrowingConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +45,7 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> protected final WatermarkGauge watermarkGauge = new WatermarkGauge(); @Nullable protected final OutputTag<T> outputTag; protected WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE; + protected final ThrowingConsumer<StreamRecord<T>, Exception> recordProcessor; public ChainingOutput( Input<T> input, @@ -59,6 +62,7 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> } this.numRecordsIn = curOperatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter(); this.outputTag = outputTag; + this.recordProcessor = RecordProcessorUtils.getRecordProcessor(input); } @Override @@ -87,8 +91,7 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> numRecordsOut.inc(); numRecordsIn.inc(); - input.setKeyContextElement(castRecord); - input.processElement(castRecord); + recordProcessor.accept(castRecord); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java index c9091165ab7..c27d3a83490 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java @@ -72,8 +72,7 @@ final class CopyingChainingOutput<T> extends ChainingOutput<T> { numRecordsOut.inc(); numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); - input.setKeyContextElement(copy); - input.processElement(copy); + recordProcessor.accept(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 1e445d05334..ddcb0182633 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.sort.SortingDataInput; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor; import org.apache.flink.streaming.runtime.io.StreamTaskInput; import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput; @@ -44,6 +45,7 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.shaded.curator5.com.google.common.collect.Iterables; @@ -217,6 +219,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO private final WatermarkGauge watermarkGauge; private final Counter numRecordsIn; + private final ThrowingConsumer<StreamRecord<IN>, Exception> recordProcessor; private StreamTaskNetworkOutput( Input<IN> operator, WatermarkGauge watermarkGauge, Counter numRecordsIn) { @@ -224,13 +227,13 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO this.operator = checkNotNull(operator); this.watermarkGauge = checkNotNull(watermarkGauge); this.numRecordsIn = checkNotNull(numRecordsIn); + this.recordProcessor = RecordProcessorUtils.getRecordProcessor(operator); } @Override public void emitRecord(StreamRecord<IN> record) throws Exception { numRecordsIn.inc(); - operator.setKeyContextElement(record); - operator.processElement(record); + recordProcessor.accept(record); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java new file mode 100644 index 00000000000..69d9703e007 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.KeyContextHandler; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RecordProcessorUtils}. */ +class RecordProcessorUtilsTest { + + @Test + void testGetRecordProcessor() throws Exception { + TestOperator input1 = new TestOperator(); + TestOperator input2 = new TestKeyContextHandlerOperator(true); + TestOperator input3 = new TestKeyContextHandlerOperator(false); + + RecordProcessorUtils.getRecordProcessor(input1).accept(new StreamRecord<>("test")); + assertThat(input1.setKeyContextElementCalled).isTrue(); + assertThat(input1.processElementCalled).isTrue(); + + RecordProcessorUtils.getRecordProcessor(input2).accept(new StreamRecord<>("test")); + assertThat(input2.setKeyContextElementCalled).isTrue(); + assertThat(input2.processElementCalled).isTrue(); + + RecordProcessorUtils.getRecordProcessor(input3).accept(new StreamRecord<>("test")); + assertThat(input3.setKeyContextElementCalled).isFalse(); + assertThat(input3.processElementCalled).isTrue(); + } + + @Test + void testGetRecordProcessor1() throws Exception { + TestOperator operator1 = new TestOperator(); + TestOperator operator2 = new TestKeyContextHandlerOperator(true, true); + TestOperator operator3 = new TestKeyContextHandlerOperator(false, true); + + RecordProcessorUtils.getRecordProcessor1(operator1).accept(new StreamRecord<>("test")); + assertThat(operator1.setKeyContextElement1Called).isTrue(); + assertThat(operator1.processElement1Called).isTrue(); + + RecordProcessorUtils.getRecordProcessor1(operator2).accept(new StreamRecord<>("test")); + assertThat(operator2.setKeyContextElement1Called).isTrue(); + assertThat(operator2.processElement1Called).isTrue(); + + RecordProcessorUtils.getRecordProcessor1(operator3).accept(new StreamRecord<>("test")); + assertThat(operator3.setKeyContextElement1Called).isFalse(); + assertThat(operator3.processElement1Called).isTrue(); + } + + @Test + void testGetRecordProcessor2() throws Exception { + TestOperator operator1 = new TestOperator(); + TestOperator operator2 = new TestKeyContextHandlerOperator(true, true); + TestOperator operator3 = new TestKeyContextHandlerOperator(true, false); + + RecordProcessorUtils.getRecordProcessor2(operator1).accept(new StreamRecord<>("test")); + assertThat(operator1.setKeyContextElement2Called).isTrue(); + assertThat(operator1.processElement2Called).isTrue(); + + RecordProcessorUtils.getRecordProcessor2(operator2).accept(new StreamRecord<>("test")); + assertThat(operator2.setKeyContextElement2Called).isTrue(); + assertThat(operator2.processElement2Called).isTrue(); + + RecordProcessorUtils.getRecordProcessor2(operator3).accept(new StreamRecord<>("test")); + assertThat(operator3.setKeyContextElement2Called).isFalse(); + assertThat(operator3.processElement2Called).isTrue(); + } + + @Test + void testOverrideSetKeyContextElementForOneInputStreamOperator() throws Exception { + // test no override + NoOverrideOneInputStreamOperator noOverride = new NoOverrideOneInputStreamOperator(); + RecordProcessorUtils.getRecordProcessor(noOverride).accept(new StreamRecord<>("test")); + assertThat(noOverride.setCurrentKeyCalled).isFalse(); + + // test override "SetKeyContextElement" + OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext = + new OverrideSetKeyContextOneInputStreamOperator(); + RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext) + .accept(new StreamRecord<>("test")); + assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue(); + + // test override "SetKeyContextElement1" + OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 = + new OverrideSetKeyContext1OneInputStreamOperator(); + RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext1) + .accept(new StreamRecord<>("test")); + assertThat(overrideSetKeyContext1.setKeyContextElement1Called).isTrue(); + } + + @Test + void testOverrideSetKeyContextElementForTwoInputStreamOperator() throws Exception { + // test no override + NoOverrideTwoInputStreamOperator noOverride = new NoOverrideTwoInputStreamOperator(); + RecordProcessorUtils.getRecordProcessor1(noOverride).accept(new StreamRecord<>("test")); + RecordProcessorUtils.getRecordProcessor2(noOverride).accept(new StreamRecord<>("test")); + assertThat(noOverride.setCurrentKeyCalled).isFalse(); + + // test override "SetKeyContextElement1" and "SetKeyContextElement2" + OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator override = + new OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator(); + RecordProcessorUtils.getRecordProcessor1(override).accept(new StreamRecord<>("test")); + RecordProcessorUtils.getRecordProcessor2(override).accept(new StreamRecord<>("test")); + assertThat(override.setKeyContextElement1Called).isTrue(); + assertThat(override.setKeyContextElement2Called).isTrue(); + } + + private static class NoOverrideOperator extends AbstractStreamOperator<String> { + + boolean setCurrentKeyCalled = false; + + NoOverrideOperator() throws Exception { + super(); + // For case that "SetKeyContextElement" has not been overridden, + // we can determine whether the "SetKeyContextElement" is called through + // "setCurrentKey". According to the implementation, we need to make the + // "stateKeySelector1/stateKeySelector2" not null. Besides, we override the + // "hasKeyContext1" and "hasKeyContext2" to avoid "stateKeySelector1/stateKeySelector2" + // from affecting the return value + Configuration configuration = new Configuration(); + KeySelector keySelector = x -> x; + InstantiationUtil.writeObjectToConfig(keySelector, configuration, "statePartitioner0"); + InstantiationUtil.writeObjectToConfig(keySelector, configuration, "statePartitioner1"); + setup( + new StreamTaskITCase.NoOpStreamTask<>(new DummyEnvironment()), + new MockStreamConfig(configuration, 1), + new MockOutput<>(new ArrayList<>())); + } + + @Override + public boolean hasKeyContext1() { + return false; + } + + @Override + public boolean hasKeyContext2() { + return false; + } + + @Override + public void setCurrentKey(Object key) { + setCurrentKeyCalled = true; + } + } + + private static class NoOverrideOneInputStreamOperator extends NoOverrideOperator + implements OneInputStreamOperator<String, String> { + + NoOverrideOneInputStreamOperator() throws Exception { + super(); + } + + @Override + public void processElement(StreamRecord<String> element) throws Exception {} + } + + private static class OverrideSetKeyContextOneInputStreamOperator + extends NoOverrideOneInputStreamOperator { + boolean setKeyContextElementCalled = false; + + OverrideSetKeyContextOneInputStreamOperator() throws Exception { + super(); + } + + @Override + public void setKeyContextElement(StreamRecord<String> record) throws Exception { + setKeyContextElementCalled = true; + } + } + + private static class OverrideSetKeyContext1OneInputStreamOperator + extends NoOverrideOneInputStreamOperator { + boolean setKeyContextElement1Called = false; + + OverrideSetKeyContext1OneInputStreamOperator() throws Exception { + super(); + } + + @Override + public void setKeyContextElement1(StreamRecord record) throws Exception { + setKeyContextElement1Called = true; + } + } + + private static class NoOverrideTwoInputStreamOperator extends NoOverrideOperator + implements TwoInputStreamOperator<String, String, String> { + + NoOverrideTwoInputStreamOperator() throws Exception { + super(); + } + + @Override + public void processElement1(StreamRecord<String> element) throws Exception {} + + @Override + public void processElement2(StreamRecord<String> element) throws Exception {} + } + + private static class OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator + extends NoOverrideTwoInputStreamOperator { + + boolean setKeyContextElement1Called = false; + + boolean setKeyContextElement2Called = false; + + OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator() throws Exception { + super(); + } + + @Override + public void setKeyContextElement1(StreamRecord record) throws Exception { + setKeyContextElement1Called = true; + } + + @Override + public void setKeyContextElement2(StreamRecord record) throws Exception { + setKeyContextElement2Called = true; + } + } + + private static class TestOperator + implements Input<String>, TwoInputStreamOperator<String, String, String> { + boolean setKeyContextElementCalled = false; + boolean processElementCalled = false; + + boolean setKeyContextElement1Called = false; + boolean processElement1Called = false; + + boolean setKeyContextElement2Called = false; + boolean processElement2Called = false; + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + processElementCalled = true; + } + + @Override + public void processWatermark(Watermark mark) throws Exception {} + + @Override + public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {} + + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {} + + @Override + public void setKeyContextElement(StreamRecord<String> record) throws Exception { + setKeyContextElementCalled = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} + + @Override + public void setCurrentKey(Object key) {} + + @Override + public Object getCurrentKey() { + return null; + } + + @Override + public void open() throws Exception {} + + @Override + public void finish() throws Exception {} + + @Override + public void close() throws Exception {} + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {} + + @Override + public OperatorSnapshotFutures snapshotState( + long checkpointId, + long timestamp, + CheckpointOptions checkpointOptions, + CheckpointStreamFactory storageLocation) + throws Exception { + return null; + } + + @Override + public void initializeState(StreamTaskStateInitializer streamTaskStateManager) + throws Exception {} + + @Override + public OperatorMetricGroup getMetricGroup() { + return null; + } + + @Override + public OperatorID getOperatorID() { + return null; + } + + @Override + public void setKeyContextElement1(StreamRecord<?> record) throws Exception { + setKeyContextElement1Called = true; + } + + @Override + public void setKeyContextElement2(StreamRecord<?> record) throws Exception { + setKeyContextElement2Called = true; + } + + @Override + public void processElement1(StreamRecord<String> element) throws Exception { + processElement1Called = true; + } + + @Override + public void processElement2(StreamRecord<String> element) throws Exception { + processElement2Called = true; + } + + @Override + public void processWatermark1(Watermark mark) throws Exception {} + + @Override + public void processWatermark2(Watermark mark) throws Exception {} + + @Override + public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {} + + @Override + public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {} + + @Override + public void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {} + + @Override + public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {} + } + + private static class TestKeyContextHandlerOperator extends TestOperator + implements KeyContextHandler { + private final boolean hasKeyContext1; + private final boolean hasKeyContext2; + + TestKeyContextHandlerOperator(boolean hasKeyContext) { + this.hasKeyContext1 = hasKeyContext; + this.hasKeyContext2 = true; + } + + TestKeyContextHandlerOperator(boolean hasKeyContext1, boolean hasKeyContext2) { + this.hasKeyContext1 = hasKeyContext1; + this.hasKeyContext2 = hasKeyContext2; + } + + @Override + public boolean hasKeyContext() { + return hasKeyContext1; + } + + @Override + public boolean hasKeyContext1() { + return hasKeyContext1; + } + + @Override + public boolean hasKeyContext2() { + return hasKeyContext2; + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java index 2343441bebc..4d0a98ac4a6 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java @@ -19,15 +19,25 @@ package org.apache.flink.table.runtime.operators.multipleinput.input; import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.KeyContextHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase; /** Base {@link Input} used in {@link MultipleInputStreamOperatorBase}. */ -public abstract class InputBase implements Input<RowData> { +public abstract class InputBase implements Input<RowData>, KeyContextHandler { @Override public void setKeyContextElement(StreamRecord<RowData> record) throws Exception { // do nothing } + + @Override + public boolean hasKeyContext() { + // Currently, we can simply return false due to InputBase#setKeyContextElement is an empty + // implementation. Once there is a non-empty implementation in the future, this method + // should also be adapted, otherwise the InputBase#setKeyContextElement will never be + // called. + return false; + } }