This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLINK-18808-record-out
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9be2171a8fbeac72e34def779e1003bf099ab802
Author: Weijie Guo <res...@163.com>
AuthorDate: Wed Apr 26 19:39:51 2023 +0800

    [FLINK-18808][streaming] Include side outputs in numRecordsOut metric
    
    Co-authored-by: 李明 <limin...@meituan.com>
---
 .../streaming/runtime/io/RecordWriterOutput.java   |  18 ++-
 .../runtime/tasks/BroadcastingOutputCollector.java |  24 +++-
 .../tasks/CopyingBroadcastingOutputCollector.java  |  41 ++++---
 .../streaming/runtime/tasks/OperatorChain.java     |  39 ++++--
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 131 +++++++++++++++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java      |  88 ++++++++++++++
 .../streaming/runtime/tasks/OperatorChainTest.java |   5 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java      | 103 ++++++++++++++++
 8 files changed, 415 insertions(+), 34 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 46fc01231be..6d1ae671153 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -60,6 +62,10 @@ public class RecordWriterOutput<OUT>
 
     private WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE;
 
+    // Uses a dummy counter here to avoid checking the existence of 
numRecordsOut on the
+    // per-record path.
+    private Counter numRecordsOut = new SimpleCounter();
+
     @SuppressWarnings("unchecked")
     public RecordWriterOutput(
             RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
recordWriter,
@@ -86,12 +92,16 @@ public class RecordWriterOutput<OUT>
 
     @Override
     public void collect(StreamRecord<OUT> record) {
-        collectAndCheckIfCountNeeded(record);
+        if (collectAndCheckIfCountNeeded(record)) {
+            numRecordsOut.inc();
+        }
     }
 
     @Override
     public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
-        collectAndCheckIfCountNeeded(outputTag, record);
+        if (collectAndCheckIfCountNeeded(outputTag, record)) {
+            numRecordsOut.inc();
+        }
     }
 
     @Override
@@ -168,6 +178,10 @@ public class RecordWriterOutput<OUT>
         }
     }
 
+    public void setNumRecordsOut(Counter numRecordsOut) {
+        this.numRecordsOut = checkNotNull(numRecordsOut);
+    }
+
     public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) 
throws IOException {
         if (isPriorityEvent
                 && event instanceof CheckpointBarrier
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
index 344215f0e51..2ddc8781eb7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -31,12 +32,15 @@ import java.util.Random;
 
 class BroadcastingOutputCollector<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
-    protected final Output<StreamRecord<T>>[] outputs;
+    protected final OutputWithChainingCheck<StreamRecord<T>>[] outputs;
     private final Random random = new XORShiftRandom();
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
+    protected final Counter numRecordsOutForTask;
 
-    public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
+    public BroadcastingOutputCollector(
+            OutputWithChainingCheck<StreamRecord<T>>[] outputs, Counter 
numRecordsOutForTask) {
         this.outputs = outputs;
+        this.numRecordsOutForTask = numRecordsOutForTask;
     }
 
     @Override
@@ -73,15 +77,23 @@ class BroadcastingOutputCollector<T> implements 
WatermarkGaugeExposingOutput<Str
 
     @Override
     public void collect(StreamRecord<T> record) {
-        for (Output<StreamRecord<T>> output : outputs) {
-            output.collect(record);
+        boolean emitted = false;
+        for (OutputWithChainingCheck<StreamRecord<T>> output : outputs) {
+            emitted |= output.collectAndCheckIfCountNeeded(record);
+        }
+        if (emitted) {
+            numRecordsOutForTask.inc();
         }
     }
 
     @Override
     public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
-        for (Output<StreamRecord<T>> output : outputs) {
-            output.collect(outputTag, record);
+        boolean emitted = false;
+        for (OutputWithChainingCheck<StreamRecord<T>> output : outputs) {
+            emitted |= output.collectAndCheckIfCountNeeded(outputTag, record);
+        }
+        if (emitted) {
+            numRecordsOutForTask.inc();
         }
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java
index 9e179b4c464..a70fde3901d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
 
@@ -27,37 +27,48 @@ import org.apache.flink.util.OutputTag;
  */
 final class CopyingBroadcastingOutputCollector<T> extends 
BroadcastingOutputCollector<T> {
 
-    public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] 
outputs) {
-        super(outputs);
+    public CopyingBroadcastingOutputCollector(
+            OutputWithChainingCheck<StreamRecord<T>>[] allOutputs, Counter 
numRecordsOutForTask) {
+        super(allOutputs, numRecordsOutForTask);
     }
 
     @Override
     public void collect(StreamRecord<T> record) {
+        boolean emitted = false;
+        int length = outputs.length;
 
-        for (int i = 0; i < outputs.length - 1; i++) {
-            Output<StreamRecord<T>> output = outputs[i];
+        for (int i = 0; i < length - 1; i++) {
+            OutputWithChainingCheck<StreamRecord<T>> output = outputs[i];
             StreamRecord<T> shallowCopy = record.copy(record.getValue());
-            output.collect(shallowCopy);
+            emitted |= output.collectAndCheckIfCountNeeded(shallowCopy);
         }
 
-        if (outputs.length > 0) {
-            // don't copy for the last output
-            outputs[outputs.length - 1].collect(record);
+        if (length > 0) {
+            emitted |= outputs[length - 
1].collectAndCheckIfCountNeeded(record);
+        }
+
+        if (emitted) {
+            numRecordsOutForTask.inc();
         }
     }
 
     @Override
     public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
-        for (int i = 0; i < outputs.length - 1; i++) {
-            Output<StreamRecord<T>> output = outputs[i];
+        boolean emitted = false;
+        int length = outputs.length;
 
+        for (int i = 0; i < length - 1; i++) {
+            OutputWithChainingCheck<StreamRecord<T>> output = outputs[i];
             StreamRecord<X> shallowCopy = record.copy(record.getValue());
-            output.collect(outputTag, shallowCopy);
+            emitted |= output.collectAndCheckIfCountNeeded(outputTag, 
shallowCopy);
+        }
+
+        if (length > 0) {
+            emitted |= outputs[length - 
1].collectAndCheckIfCountNeeded(outputTag, record);
         }
 
-        if (outputs.length > 0) {
-            // don't copy for the last output
-            outputs[outputs.length - 1].collect(outputTag, record);
+        if (emitted) {
+            numRecordsOutForTask.inc();
         }
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 2321af83976..0f27f2a06d5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -38,6 +39,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import 
org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
@@ -658,9 +660,6 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                         .getMetricGroup()
                         .getOrAddOperator(
                                 operatorConfig.getOperatorID(), 
operatorConfig.getOperatorName());
-        if (operatorConfig.isChainEnd()) {
-            operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
-        }
 
         return 
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
     }
@@ -703,7 +702,7 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
             List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
             MailboxExecutorFactory mailboxExecutorFactory,
             boolean shouldAddMetric) {
-        List<WatermarkGaugeExposingOutput<StreamRecord<T>>> allOutputs = new 
ArrayList<>(4);
+        List<OutputWithChainingCheck<StreamRecord<T>>> allOutputs = new 
ArrayList<>(4);
 
         // create collectors for the network outputs
         for (NonChainedOutput streamOutput :
@@ -732,7 +731,8 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                             outputEdge.getOutputTag(),
                             mailboxExecutorFactory,
                             shouldAddMetric);
-            allOutputs.add(output);
+            checkState(output instanceof OutputWithChainingCheck);
+            allOutputs.add((OutputWithChainingCheck) output);
             // If the operator has multiple downstream chained operators, only 
one of them should
             // increment the recordsOutCounter for this operator. Set 
shouldAddMetric to false
             // so that we would skip adding the counter to other downstream 
operators.
@@ -743,22 +743,35 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
 
         if (allOutputs.size() == 1) {
             result = allOutputs.get(0);
+            // only if this is a single RecordWriterOutput, reuse its 
numRecordOut for task.
+            if (result instanceof RecordWriterOutput) {
+                Counter numRecordsOutCounter = 
createNumRecordsOutCounter(containingTask);
+                ((RecordWriterOutput<T>) 
result).setNumRecordsOut(numRecordsOutCounter);
+            }
         } else {
             // send to N outputs. Note that this includes the special case
             // of sending to zero outputs
             @SuppressWarnings({"unchecked"})
-            Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
+            OutputWithChainingCheck<StreamRecord<T>>[] allOutputsArray =
+                    new OutputWithChainingCheck[allOutputs.size()];
             for (int i = 0; i < allOutputs.size(); i++) {
-                asArray[i] = allOutputs.get(i);
+                allOutputsArray[i] = allOutputs.get(i);
             }
 
             // This is the inverse of creating the normal ChainingOutput.
             // If the chaining output does not copy we need to copy in the 
broadcast output,
             // otherwise multi-chaining would not work correctly.
+            Counter numRecordsOutForTask = 
createNumRecordsOutCounter(containingTask);
             if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-                result = closer.register(new 
CopyingBroadcastingOutputCollector<>(asArray));
+                result =
+                        closer.register(
+                                new CopyingBroadcastingOutputCollector<>(
+                                        allOutputsArray, 
numRecordsOutForTask));
             } else {
-                result = closer.register(new 
BroadcastingOutputCollector<>(asArray));
+                result =
+                        closer.register(
+                                new BroadcastingOutputCollector<>(
+                                        allOutputsArray, 
numRecordsOutForTask));
             }
         }
 
@@ -774,6 +787,14 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
         return result;
     }
 
+    private static Counter createNumRecordsOutCounter(StreamTask<?, ?> 
containingTask) {
+        TaskIOMetricGroup taskIOMetricGroup =
+                
containingTask.getEnvironment().getMetricGroup().getIOMetricGroup();
+        Counter counter = new SimpleCounter();
+        taskIOMetricGroup.reuseRecordsOutputCounter(counter);
+        return counter;
+    }
+
     /**
      * Recursively create chain of operators that starts from the given 
{@param operatorConfig}.
      * Operators are created tail to head and wrapped into an {@link 
WatermarkGaugeExposingOutput}.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 8c46fd5fb92..c458b598095 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -56,6 +56,7 @@ import 
org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfData;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.StopMode;
+import 
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
@@ -84,12 +85,14 @@ import 
org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskChainedSourcesCheckpointingTest.LifeCycleMonitorMultipleInputOperatorFactory;
 import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.WatermarkMetricOperator;
@@ -98,6 +101,7 @@ import 
org.apache.flink.streaming.util.CompletingCheckpointResponder;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.SerializedValue;
 
 import org.hamcrest.collection.IsMapContaining;
@@ -1391,6 +1395,133 @@ public class MultipleInputStreamTaskTest {
                         new SerializedValue<>(new NoMoreSplitsEvent()));
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                MultipleInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .modifyExecutionConfig(applyObjectReuse(objectReuse))
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OddEvenOperatorFactory())
+                        
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(
+                                SimpleOperatorFactory.of(
+                                        new 
OneInputStreamTaskTest.OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(
+                                SimpleOperatorFactory.of(
+                                        new 
OneInputStreamTaskTest.DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            int numOddRecords = 5;
+            int numEvenRecords = 3;
+            int numNaturalRecords = 2;
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(x * 2 + 1));
+            }
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(x * 2));
+            }
+            for (int x = 0; x < numNaturalRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(x));
+            }
+
+            int totalOddRecords = numOddRecords + numNaturalRecords / 2;
+            int totalEvenRecords = numEvenRecords + (int) 
Math.ceil(numNaturalRecords / 2.0);
+            assertEquals(totalOddRecords + totalEvenRecords, 
numRecordsInCounter.getCount());
+            assertEquals(
+                    totalOddRecords
+                            + (totalOddRecords + totalEvenRecords)
+                            + (totalOddRecords + totalEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());
+            testHarness.waitForTaskCompletion();
+        } finally {
+            for (ResultPartitionWriter partitionWriter : partitionWriters) {
+                partitionWriter.close();
+            }
+        }
+    }
+
+    static class OddEvenOperator extends AbstractStreamOperatorV2<Integer>
+            implements MultipleInputStreamOperator<Integer> {
+
+        public OddEvenOperator(StreamOperatorParameters<Integer> parameters) {
+            super(parameters, 3);
+        }
+
+        @Override
+        public List<Input> getInputs() {
+            return Arrays.asList(
+                    new OddEvenInput(this, 1),
+                    new OddEvenInput(this, 2),
+                    new OddEvenInput(this, 3));
+        }
+
+        static class OddEvenInput extends AbstractInput<Integer, Integer> {
+            private final OutputTag<Integer> oddOutputTag =
+                    new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO);
+            private final OutputTag<Integer> evenOutputTag =
+                    new OutputTag<>("even", BasicTypeInfo.INT_TYPE_INFO);
+
+            public OddEvenInput(AbstractStreamOperatorV2<Integer> owner, int 
inputId) {
+                super(owner, inputId);
+            }
+
+            @Override
+            public void processElement(StreamRecord<Integer> element) throws 
Exception {
+                if (element.getValue() % 2 == 0) {
+                    output.collect(evenOutputTag, element);
+                } else {
+                    output.collect(oddOutputTag, element);
+                }
+                output.collect(element);
+            }
+        }
+    }
+
+    private static class OddEvenOperatorFactory extends 
AbstractStreamOperatorFactory<Integer> {
+        @Override
+        public <T extends StreamOperator<Integer>> T createStreamOperator(
+                StreamOperatorParameters<Integer> parameters) {
+            return (T) new OddEvenOperator(parameters);
+        }
+
+        @Override
+        public Class<? extends StreamOperator<Integer>> getStreamOperatorClass(
+                ClassLoader classLoader) {
+            return OddEvenOperator.class;
+        }
+    }
+
     static class LifeCycleTrackingMapToStringMultipleInputOperator
             extends MapToStringMultipleInputOperator implements 
BoundedMultiInput {
         public static final String OPEN = "MultipleInputOperator#open";
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index f8e60522b31..691718adf78 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -40,6 +40,8 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import 
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
@@ -48,6 +50,7 @@ import 
org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
 import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -59,13 +62,16 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -996,6 +1002,88 @@ public class OneInputStreamTaskTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+                        
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(SimpleOperatorFactory.of(new 
DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());
+            testHarness.waitForTaskCompletion();
+        } finally {
+            for (ResultPartitionWriter partitionWriter : partitionWriters) {
+                partitionWriter.close();
+            }
+        }
+    }
+
+    static class OddEvenOperator extends AbstractStreamOperator<Integer>
+            implements OneInputStreamOperator<Integer, Integer> {
+        private final OutputTag<Integer> oddOutputTag =
+                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO);
+        private final OutputTag<Integer> evenOutputTag =
+                new OutputTag<>("even", BasicTypeInfo.INT_TYPE_INFO);
+
+        @Override
+        public void processElement(StreamRecord<Integer> element) {
+            if (element.getValue() % 2 == 0) {
+                output.collect(evenOutputTag, element);
+            } else {
+                output.collect(oddOutputTag, element);
+            }
+            output.collect(element);
+        }
+    }
+
     static class WatermarkMetricOperator extends AbstractStreamOperator<String>
             implements OneInputStreamOperator<String, String> {
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
index c48b0843a7c..27ac625cf63 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
@@ -89,7 +89,8 @@ public class OperatorChainTest {
             // initial output goes to nowhere
             @SuppressWarnings({"unchecked", "rawtypes"})
             WatermarkGaugeExposingOutput<StreamRecord<T>> lastWriter =
-                    new BroadcastingOutputCollector<>(new Output[0]);
+                    new BroadcastingOutputCollector<>(
+                            new OutputWithChainingCheck[0], new 
SimpleCounter());
 
             // build the reverse operators array
             for (int i = 0; i < operators.length; i++) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 9d215a4b1a6..76130881c57 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -35,6 +35,8 @@ import 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfData;
 import org.apache.flink.runtime.io.network.api.StopMode;
+import 
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
@@ -43,6 +45,7 @@ import 
org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
 import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -53,17 +56,21 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.OutputTag;
 
 import org.hamcrest.collection.IsMapContaining;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -825,6 +832,102 @@ public class TwoInputStreamTaskTest {
         }
     }
 
+    @Test
+    public void testTaskSideOutputStatistics() throws Exception {
+        TaskMetricGroup taskMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+        ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+        for (int i = 0; i < partitionWriters.length; ++i) {
+            partitionWriters[i] =
+                    new RecordOrEventCollectingResultPartitionWriter<>(
+                            new ArrayDeque<>(),
+                            new StreamElementSerializer<>(
+                                    
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+                                            new ExecutionConfig())));
+            partitionWriters[i].setup();
+        }
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                TwoInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .addAdditionalOutput(partitionWriters)
+                        .setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+                        
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(
+                                SimpleOperatorFactory.of(
+                                        new 
OneInputStreamTaskTest.OddEvenOperator()))
+                        .addNonChainedOutputsCount(
+                                new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .setOperatorFactory(
+                                SimpleOperatorFactory.of(
+                                        new 
OneInputStreamTaskTest.DuplicatingOperator()))
+                        .addNonChainedOutputsCount(1)
+                        .build()
+                        .finish()
+                        .setTaskMetricGroup(taskMetricGroup)
+                        .build()) {
+            Counter numRecordsInCounter =
+                    
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+            Counter numRecordsOutCounter =
+                    
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+            final int numEvenRecords = 5;
+            final int numOddRecords = 3;
+
+            for (int x = 0; x < numEvenRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x));
+            }
+
+            for (int x = 0; x < numOddRecords; x++) {
+                testHarness.processElement(new StreamRecord<>(2 * x + 1));
+            }
+            assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+            assertEquals(
+                    numOddRecords
+                            + (numOddRecords + numEvenRecords)
+                            + (numOddRecords + numEvenRecords) * 2,
+                    numRecordsOutCounter.getCount());
+            testHarness.waitForTaskCompletion();
+        } finally {
+            for (ResultPartitionWriter partitionWriter : partitionWriters) {
+                partitionWriter.close();
+            }
+        }
+    }
+
+    static class OddEvenOperator extends AbstractStreamOperator<Integer>
+            implements TwoInputStreamOperator<Integer, Integer, Integer> {
+        private final OutputTag<Integer> oddOutputTag =
+                new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO);
+        private final OutputTag<Integer> evenOutputTag =
+                new OutputTag<>("even", BasicTypeInfo.INT_TYPE_INFO);
+
+        @Override
+        public void processElement1(StreamRecord<Integer> element) throws 
Exception {
+            processElement(element);
+        }
+
+        @Override
+        public void processElement2(StreamRecord<Integer> element) throws 
Exception {
+            processElement(element);
+        }
+
+        private void processElement(StreamRecord<Integer> element) {
+            if (element.getValue() % 2 == 0) {
+                output.collect(evenOutputTag, element);
+            } else {
+                output.collect(oddOutputTag, element);
+            }
+            output.collect(element);
+        }
+    }
+
     // This must only be used in one test, otherwise the static fields will be 
changed
     // by several tests concurrently
     private static class TestOpenCloseMapFunction


Reply via email to