This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLINK-18808-record-out
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5aa8a5ea2a9920e6368089cd8f82f0344ace6ef
Author: Weijie Guo <res...@163.com>
AuthorDate: Wed Apr 26 18:03:07 2023 +0800

    [FLINK-18808][streaming] Introduce OutputWithRecordsCountCheck.
---
 .../streaming/runtime/io/RecordWriterOutput.java   | 30 +++++++++++++---
 .../streaming/runtime/tasks/ChainingOutput.java    | 17 ++++++++-
 .../runtime/tasks/OutputWithChainingCheck.java     | 42 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 6 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 93acf23d30b..46fc01231be 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OutputWithChainingCheck;
 import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
@@ -43,7 +44,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Implementation of {@link Output} that sends data using a {@link 
RecordWriter}. */
 @Internal
-public class RecordWriterOutput<OUT> implements 
WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
+public class RecordWriterOutput<OUT>
+        implements WatermarkGaugeExposingOutput<StreamRecord<OUT>>,
+                OutputWithChainingCheck<StreamRecord<OUT>> {
 
     private RecordWriter<SerializationDelegate<StreamElement>> recordWriter;
 
@@ -83,19 +86,36 @@ public class RecordWriterOutput<OUT> implements 
WatermarkGaugeExposingOutput<Str
 
     @Override
     public void collect(StreamRecord<OUT> record) {
+        collectAndCheckIfCountNeeded(record);
+    }
+
+    @Override
+    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
+        collectAndCheckIfCountNeeded(outputTag, record);
+    }
+
+    @Override
+    public boolean collectAndCheckIfCountNeeded(StreamRecord<OUT> record) {
         if (this.outputTag != null) {
             // we are not responsible for emitting to the main output.
-            return;
+            return false;
         }
 
         pushToRecordWriter(record);
+        return true;
     }
 
     @Override
-    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
-        if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
-            pushToRecordWriter(record);
+    public <X> boolean collectAndCheckIfCountNeeded(
+            OutputTag<X> outputTag, StreamRecord<X> record) {
+        if (!OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
+            // we are not responsible for emitting to the side-output 
specified by this
+            // OutputTag.
+            return false;
         }
+
+        pushToRecordWriter(record);
+        return true;
     }
 
     private <X> void pushToRecordWriter(StreamRecord<X> record) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 0ff3785056f..5b2cf742442 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -36,7 +36,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>> {
+class ChainingOutput<T>
+        implements WatermarkGaugeExposingOutput<StreamRecord<T>>,
+                OutputWithChainingCheck<StreamRecord<T>> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ChainingOutput.class);
 
     protected final Input<T> input;
@@ -82,6 +84,19 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
         }
     }
 
+    @Override
+    public boolean collectAndCheckIfCountNeeded(StreamRecord<T> record) {
+        collect(record);
+        return false;
+    }
+
+    @Override
+    public <X> boolean collectAndCheckIfCountNeeded(
+            OutputTag<X> outputTag, StreamRecord<X> record) {
+        collect(outputTag, record);
+        return false;
+    }
+
     protected <X> void pushToOperator(StreamRecord<X> record) {
         try {
             // we know that the given outputTag matches our OutputTag so the 
record
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java
new file mode 100644
index 00000000000..671a3221357
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * This is a wrapper for outputs to check whether the collected record has 
been emitted to a
+ * downstream subtask or to a chained operator.
+ */
+@Internal
+public interface OutputWithChainingCheck<OUT> extends 
WatermarkGaugeExposingOutput<OUT> {
+    /**
+     * @return true if the collected record has been emitted to a downstream 
subtask. Otherwise,
+     *     false.
+     */
+    boolean collectAndCheckIfCountNeeded(OUT record);
+
+    /**
+     * @return true if the collected record has been emitted to a downstream 
subtask. Otherwise,
+     *     false.
+     */
+    <X> boolean collectAndCheckIfCountNeeded(OutputTag<X> outputTag, 
StreamRecord<X> record);
+}

Reply via email to