This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLINK-18808-record-out in repository https://gitbox.apache.org/repos/asf/flink.git
commit d5aa8a5ea2a9920e6368089cd8f82f0344ace6ef Author: Weijie Guo <res...@163.com> AuthorDate: Wed Apr 26 18:03:07 2023 +0800 [FLINK-18808][streaming] Introduce OutputWithRecordsCountCheck. --- .../streaming/runtime/io/RecordWriterOutput.java | 30 +++++++++++++--- .../streaming/runtime/tasks/ChainingOutput.java | 17 ++++++++- .../runtime/tasks/OutputWithChainingCheck.java | 42 ++++++++++++++++++++++ 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 93acf23d30b..46fc01231be 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OutputWithChainingCheck; import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.OutputTag; @@ -43,7 +44,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** Implementation of {@link Output} that sends data using a {@link RecordWriter}. */ @Internal -public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> { +public class RecordWriterOutput<OUT> + implements WatermarkGaugeExposingOutput<StreamRecord<OUT>>, + OutputWithChainingCheck<StreamRecord<OUT>> { private RecordWriter<SerializationDelegate<StreamElement>> recordWriter; @@ -83,19 +86,36 @@ public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<Str @Override public void collect(StreamRecord<OUT> record) { + collectAndCheckIfCountNeeded(record); + } + + @Override + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { + collectAndCheckIfCountNeeded(outputTag, record); + } + + @Override + public boolean collectAndCheckIfCountNeeded(StreamRecord<OUT> record) { if (this.outputTag != null) { // we are not responsible for emitting to the main output. - return; + return false; } pushToRecordWriter(record); + return true; } @Override - public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { - if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) { - pushToRecordWriter(record); + public <X> boolean collectAndCheckIfCountNeeded( + OutputTag<X> outputTag, StreamRecord<X> record) { + if (!OutputTag.isResponsibleFor(this.outputTag, outputTag)) { + // we are not responsible for emitting to the side-output specified by this + // OutputTag. + return false; } + + pushToRecordWriter(record); + return true; } private <X> void pushToRecordWriter(StreamRecord<X> record) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java index 0ff3785056f..5b2cf742442 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java @@ -36,7 +36,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> { +class ChainingOutput<T> + implements WatermarkGaugeExposingOutput<StreamRecord<T>>, + OutputWithChainingCheck<StreamRecord<T>> { private static final Logger LOG = LoggerFactory.getLogger(ChainingOutput.class); protected final Input<T> input; @@ -82,6 +84,19 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> } } + @Override + public boolean collectAndCheckIfCountNeeded(StreamRecord<T> record) { + collect(record); + return false; + } + + @Override + public <X> boolean collectAndCheckIfCountNeeded( + OutputTag<X> outputTag, StreamRecord<X> record) { + collect(outputTag, record); + return false; + } + protected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java new file mode 100644 index 00000000000..671a3221357 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; + +/** + * This is a wrapper for outputs to check whether the collected record has been emitted to a + * downstream subtask or to a chained operator. + */ +@Internal +public interface OutputWithChainingCheck<OUT> extends WatermarkGaugeExposingOutput<OUT> { + /** + * @return true if the collected record has been emitted to a downstream subtask. Otherwise, + * false. + */ + boolean collectAndCheckIfCountNeeded(OUT record); + + /** + * @return true if the collected record has been emitted to a downstream subtask. Otherwise, + * false. + */ + <X> boolean collectAndCheckIfCountNeeded(OutputTag<X> outputTag, StreamRecord<X> record); +}