This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c45c4a05dd2 [FLINK-30536][runtime] Remove CountingOutput from 
per-record code path for most operators
c45c4a05dd2 is described below

commit c45c4a05dd2043e03716b51f5ea76219c73f327b
Author: Dong Lin <lindon...@gmail.com>
AuthorDate: Mon Jan 9 18:44:29 2023 +0800

    [FLINK-30536][runtime] Remove CountingOutput from per-record code path for 
most operators
    
    This closes #21579.
---
 .../api/operators/AbstractStreamOperator.java      |  28 ++---
 .../api/operators/AbstractStreamOperatorV2.java    |  38 ++-----
 .../streaming/api/operators/CountingOutput.java    |  16 ++-
 .../runtime/operators/sink/SinkWriterOperator.java |  18 ----
 .../streaming/runtime/tasks/ChainingOutput.java    |  30 ++----
 .../runtime/tasks/CopyingChainingOutput.java       |  16 +--
 .../streaming/runtime/tasks/OperatorChain.java     | 120 ++++++++++++++++++---
 .../streaming/runtime/tasks/OperatorChainTest.java |   2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |   5 +
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   3 +-
 10 files changed, 155 insertions(+), 121 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a1afe279833..d2a24dee4c7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -159,23 +159,13 @@ public abstract class AbstractStreamOperator<OUT>
         final Environment environment = containingTask.getEnvironment();
         this.container = containingTask;
         this.config = config;
-        try {
-            InternalOperatorMetricGroup operatorMetricGroup =
-                    environment
-                            .getMetricGroup()
-                            .getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
-            this.output = registerCounterOnOutput(output, operatorMetricGroup);
-            if (config.isChainEnd()) {
-                
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
-            }
-            this.metrics = operatorMetricGroup;
-        } catch (Exception e) {
-            LOG.warn("An error occurred while instantiating task metrics.", e);
-            this.metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
-            this.output = output;
-        }
-
+        this.output = output;
+        this.metrics =
+                environment
+                        .getMetricGroup()
+                        .getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
         this.combinedWatermark = 
IndexedCombinedWatermarkStatus.forInputsCount(2);
+
         try {
             Configuration taskManagerConfig = 
environment.getTaskManagerInfo().getConfiguration();
             int historySize = 
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
@@ -646,10 +636,4 @@ public abstract class AbstractStreamOperator<OUT>
     protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
         return Optional.ofNullable(timeServiceManager);
     }
-
-    protected Output<StreamRecord<OUT>> registerCounterOnOutput(
-            Output<StreamRecord<OUT>> output, OperatorMetricGroup 
operatorMetricGroup) {
-        return new CountingOutput<>(
-                output, 
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
-    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 5f82e85016d..a10206ba0e5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -104,50 +104,26 @@ public abstract class AbstractStreamOperatorV2<OUT>
     public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, 
int numberOfInputs) {
         final Environment environment = 
parameters.getContainingTask().getEnvironment();
         config = parameters.getStreamConfig();
-        CountingOutput<OUT> countingOutput;
-        InternalOperatorMetricGroup operatorMetricGroup;
-        try {
-            operatorMetricGroup =
-                    environment
-                            .getMetricGroup()
-                            .getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
-            countingOutput =
-                    new CountingOutput(
-                            parameters.getOutput(),
-                            
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
-            if (config.isChainEnd()) {
-                
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
-            }
-        } catch (Exception e) {
-            LOG.warn("An error occurred while instantiating task metrics.", e);
-            countingOutput = null;
-            operatorMetricGroup = null;
-        }
-
-        if (countingOutput == null || operatorMetricGroup == null) {
-            metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
-            output = parameters.getOutput();
-        } else {
-            metrics = operatorMetricGroup;
-            output = countingOutput;
-        }
-
+        output = parameters.getOutput();
+        metrics =
+                environment
+                        .getMetricGroup()
+                        .getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
         latencyStats =
                 createLatencyStats(
                         environment.getTaskManagerInfo().getConfiguration(),
                         
parameters.getContainingTask().getIndexInSubtaskGroup());
-
         processingTimeService = 
Preconditions.checkNotNull(parameters.getProcessingTimeService());
         executionConfig = parameters.getContainingTask().getExecutionConfig();
         userCodeClassLoader = 
parameters.getContainingTask().getUserCodeClassLoader();
         cancelables = parameters.getContainingTask().getCancelables();
-        this.combinedWatermark = 
IndexedCombinedWatermarkStatus.forInputsCount(numberOfInputs);
+        combinedWatermark = 
IndexedCombinedWatermarkStatus.forInputsCount(numberOfInputs);
 
         runtimeContext =
                 new StreamingRuntimeContext(
                         environment,
                         environment.getAccumulatorRegistry().getUserMap(),
-                        operatorMetricGroup,
+                        metrics,
                         getOperatorID(),
                         processingTimeService,
                         null,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java
index c87e05439b0..96028c139a6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java
@@ -19,20 +19,23 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
 
 /** Wrapping {@link Output} that updates metrics on the number of emitted 
elements. */
-public class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
-    private final Output<StreamRecord<OUT>> output;
+public class CountingOutput<OUT> implements 
WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
+    private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> output;
     private final Counter numRecordsOut;
 
-    public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
+    public CountingOutput(
+            WatermarkGaugeExposingOutput<StreamRecord<OUT>> output, Counter 
numRecordsOut) {
         this.output = output;
-        this.numRecordsOut = counter;
+        this.numRecordsOut = numRecordsOut;
     }
 
     @Override
@@ -66,4 +69,9 @@ public class CountingOutput<OUT> implements 
Output<StreamRecord<OUT>> {
     public void close() {
         output.close();
     }
+
+    @Override
+    public Gauge<Long> getWatermarkGauge() {
+        return output.getWatermarkGauge();
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 9029584f986..e593616abf7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -42,7 +41,6 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -218,22 +216,6 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         closeAll(sinkWriter, super::close);
     }
 
-    /**
-     * Skip registering numRecordsOut counter on output.
-     *
-     * <p>Metric "numRecordsOut" is defined as the total number of records 
written to the external
-     * system in FLIP-33, but this metric is occupied in 
AbstractStreamOperator as the number of
-     * records sent to downstream operators, which is number of Committable 
batches sent to
-     * SinkCommitter. So we skip registering this metric on output and leave 
this metric to sink
-     * writer implementations to report.
-     */
-    @Override
-    protected Output<StreamRecord<CommittableMessage<CommT>>> 
registerCounterOnOutput(
-            Output<StreamRecord<CommittableMessage<CommT>>> output,
-            OperatorMetricGroup operatorMetricGroup) {
-        return output;
-    }
-
     private void emit(
             int indexOfThisSubtask,
             int numberOfParallelSubtasks,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 8ab88a301a2..0bf275b2cd7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -20,10 +20,8 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
@@ -40,33 +38,26 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
     private static final Logger LOG = 
LoggerFactory.getLogger(ChainingOutput.class);
 
     protected final Input<T> input;
+    protected final Counter numRecordsOut;
     protected final Counter numRecordsIn;
     protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
     @Nullable protected final OutputTag<T> outputTag;
     protected WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE;
 
-    public ChainingOutput(OneInputStreamOperator<T, ?> operator, @Nullable 
OutputTag<T> outputTag) {
-        this(operator, operator.getMetricGroup(), outputTag);
-    }
-
     public ChainingOutput(
             Input<T> input,
-            OperatorMetricGroup operatorMetricGroup,
+            @Nullable Counter prevNumRecordsOut,
+            OperatorMetricGroup curOperatorMetricGroup,
             @Nullable OutputTag<T> outputTag) {
         this.input = input;
-
-        {
-            Counter tmpNumRecordsIn;
-            try {
-                OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup.getIOMetricGroup();
-                tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
-            } catch (Exception e) {
-                LOG.warn("An exception occurred during the metrics setup.", e);
-                tmpNumRecordsIn = new SimpleCounter();
-            }
-            numRecordsIn = tmpNumRecordsIn;
+        if (prevNumRecordsOut != null) {
+            this.numRecordsOut = prevNumRecordsOut;
+        } else {
+            // Uses a dummy counter here to avoid checking the existence of 
numRecordsOut on the
+            // per-record path.
+            this.numRecordsOut = new SimpleCounter();
         }
-
+        this.numRecordsIn = 
curOperatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
         this.outputTag = outputTag;
     }
 
@@ -94,6 +85,7 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
             @SuppressWarnings("unchecked")
             StreamRecord<T> castRecord = (StreamRecord<T>) record;
 
+            numRecordsOut.inc();
             numRecordsIn.inc();
             input.setKeyContextElement(castRecord);
             input.processElement(castRecord);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
index 16c87409736..c9091165ab7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
 
@@ -30,20 +30,13 @@ final class CopyingChainingOutput<T> extends 
ChainingOutput<T> {
 
     private final TypeSerializer<T> serializer;
 
-    public CopyingChainingOutput(
-            OneInputStreamOperator<T, ?> operator,
-            TypeSerializer<T> serializer,
-            @Nullable OutputTag<T> outputTag) {
-        super(operator, outputTag);
-        this.serializer = serializer;
-    }
-
     public CopyingChainingOutput(
             Input<T> input,
             TypeSerializer<T> serializer,
-            OperatorMetricGroup operatorMetricGroup,
+            @Nullable Counter prevRecordsOutCounter,
+            OperatorMetricGroup curOperatorMetricGroup,
             @Nullable OutputTag<T> outputTag) {
-        super(input, operatorMetricGroup, outputTag);
+        super(input, prevRecordsOutCounter, curOperatorMetricGroup, outputTag);
         this.serializer = serializer;
     }
 
@@ -76,6 +69,7 @@ final class CopyingChainingOutput<T> extends 
ChainingOutput<T> {
             @SuppressWarnings("unchecked")
             StreamRecord<T> castRecord = (StreamRecord<T>) record;
 
+            numRecordsOut.inc();
             numRecordsIn.inc();
             StreamRecord<T> copy = 
castRecord.copy(serializer.copy(castRecord.getValue()));
             input.setKeyContextElement(copy);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index bec2c1d4a08..2321af83976 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -36,6 +37,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
 import 
org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
@@ -46,6 +48,7 @@ import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.CountingOutput;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -58,6 +61,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
+import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
 import org.apache.flink.util.FlinkException;
@@ -199,7 +203,8 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                             userCodeClassloader,
                             recordWriterOutputs,
                             allOpWrappers,
-                            containingTask.getMailboxExecutorFactory());
+                            containingTask.getMailboxExecutorFactory(),
+                            operatorFactory != null);
 
             if (operatorFactory != null) {
                 Tuple2<OP, Optional<ProcessingTimeService>> 
mainOperatorAndTimeService =
@@ -622,6 +627,44 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
         return chainedSourceInputs;
     }
 
+    /**
+     * Get the numRecordsOut counter for the operator represented by the given 
config. And re-use
+     * the operator-level counter for the task-level numRecordsOut counter if 
this operator is at
+     * the end of the operator chain.
+     *
+     * <p>Return null if we should not use the numRecordsOut counter to track 
the records emitted by
+     * this operator.
+     */
+    @Nullable
+    private Counter getOperatorRecordsOutCounter(
+            StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
+        ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
+        StreamOperatorFactory<?> operatorFactory =
+                operatorConfig.getStreamOperatorFactory(userCodeClassloader);
+        // Do not use the numRecordsOut counter on output if this operator is 
SinkWriterOperator.
+        //
+        // Metric "numRecordsOut" is defined as the total number of records 
written to the
+        // external system in FLIP-33, but this metric is occupied in 
AbstractStreamOperator as the
+        // number of records sent to downstream operators, which is number of 
Committable batches
+        // sent to SinkCommitter. So we skip registering this metric on output 
and leave this metric
+        // to sink writer implementations to report.
+        if (operatorFactory instanceof SinkWriterOperatorFactory) {
+            return null;
+        }
+
+        InternalOperatorMetricGroup operatorMetricGroup =
+                containingTask
+                        .getEnvironment()
+                        .getMetricGroup()
+                        .getOrAddOperator(
+                                operatorConfig.getOperatorID(), 
operatorConfig.getOperatorName());
+        if (operatorConfig.isChainEnd()) {
+            operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
+        }
+
+        return 
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+    }
+
     @SuppressWarnings({"rawtypes", "unchecked"})
     private WatermarkGaugeExposingOutput<StreamRecord> 
createChainedSourceOutput(
             StreamTask<?, OP> containingTask,
@@ -631,14 +674,18 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
             OperatorMetricGroup metricGroup,
             OutputTag outputTag) {
 
+        Counter recordsOutCounter = 
getOperatorRecordsOutCounter(containingTask, sourceInputConfig);
+
         WatermarkGaugeExposingOutput<StreamRecord> chainedSourceOutput;
         if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-            chainedSourceOutput = new ChainingOutput(input, metricGroup, 
outputTag);
+            chainedSourceOutput =
+                    new ChainingOutput(input, recordsOutCounter, metricGroup, 
outputTag);
         } else {
             TypeSerializer<?> inSerializer =
                     
sourceInputConfig.getTypeSerializerOut(userCodeClassloader);
             chainedSourceOutput =
-                    new CopyingChainingOutput(input, inSerializer, 
metricGroup, outputTag);
+                    new CopyingChainingOutput(
+                            input, inSerializer, recordsOutCounter, 
metricGroup, outputTag);
         }
         /**
          * Chained sources are closed when {@link
@@ -654,7 +701,8 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
             ClassLoader userCodeClassloader,
             Map<IntermediateDataSetID, RecordWriterOutput<?>> 
recordWriterOutputs,
             List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
-            MailboxExecutorFactory mailboxExecutorFactory) {
+            MailboxExecutorFactory mailboxExecutorFactory,
+            boolean shouldAddMetric) {
         List<WatermarkGaugeExposingOutput<StreamRecord<T>>> allOutputs = new 
ArrayList<>(4);
 
         // create collectors for the network outputs
@@ -675,18 +723,26 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
             WatermarkGaugeExposingOutput<StreamRecord<T>> output =
                     createOperatorChain(
                             containingTask,
+                            operatorConfig,
                             chainedOpConfig,
                             chainedConfigs,
                             userCodeClassloader,
                             recordWriterOutputs,
                             allOperatorWrappers,
                             outputEdge.getOutputTag(),
-                            mailboxExecutorFactory);
+                            mailboxExecutorFactory,
+                            shouldAddMetric);
             allOutputs.add(output);
+            // If the operator has multiple downstream chained operators, only 
one of them should
+            // increment the recordsOutCounter for this operator. Set 
shouldAddMetric to false
+            // so that we would skip adding the counter to other downstream 
operators.
+            shouldAddMetric = false;
         }
 
+        WatermarkGaugeExposingOutput<StreamRecord<T>> result;
+
         if (allOutputs.size() == 1) {
-            return allOutputs.get(0);
+            result = allOutputs.get(0);
         } else {
             // send to N outputs. Note that this includes the special case
             // of sending to zero outputs
@@ -700,11 +756,22 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
             // If the chaining output does not copy we need to copy in the 
broadcast output,
             // otherwise multi-chaining would not work correctly.
             if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-                return closer.register(new 
CopyingBroadcastingOutputCollector<>(asArray));
+                result = closer.register(new 
CopyingBroadcastingOutputCollector<>(asArray));
             } else {
-                return closer.register(new 
BroadcastingOutputCollector<>(asArray));
+                result = closer.register(new 
BroadcastingOutputCollector<>(asArray));
+            }
+        }
+
+        if (shouldAddMetric) {
+            // Create a CountingOutput to increment the recordsOutCounter for 
this operator
+            // if we have not added the counter to any downstream chained 
operator.
+            Counter recordsOutCounter =
+                    getOperatorRecordsOutCounter(containingTask, 
operatorConfig);
+            if (recordsOutCounter != null) {
+                result = new CountingOutput<>(result, recordsOutCounter);
             }
         }
+        return result;
     }
 
     /**
@@ -713,13 +780,15 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
      */
     private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> 
createOperatorChain(
             StreamTask<OUT, ?> containingTask,
+            StreamConfig prevOperatorConfig,
             StreamConfig operatorConfig,
             Map<Integer, StreamConfig> chainedConfigs,
             ClassLoader userCodeClassloader,
             Map<IntermediateDataSetID, RecordWriterOutput<?>> 
recordWriterOutputs,
             List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
             OutputTag<IN> outputTag,
-            MailboxExecutorFactory mailboxExecutorFactory) {
+            MailboxExecutorFactory mailboxExecutorFactory,
+            boolean shouldAddMetricForPrevOperator) {
         // create the output that the operator writes to first. this may 
recursively create more
         // operators
         WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =
@@ -730,7 +799,8 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                         userCodeClassloader,
                         recordWriterOutputs,
                         allOperatorWrappers,
-                        mailboxExecutorFactory);
+                        mailboxExecutorFactory,
+                        true);
 
         OneInputStreamOperator<IN, OUT> chainedOperator =
                 createOperator(
@@ -742,7 +812,13 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                         false);
 
         return wrapOperatorIntoOutput(
-                chainedOperator, containingTask, operatorConfig, 
userCodeClassloader, outputTag);
+                chainedOperator,
+                containingTask,
+                prevOperatorConfig,
+                operatorConfig,
+                userCodeClassloader,
+                outputTag,
+                shouldAddMetricForPrevOperator);
     }
 
     /**
@@ -786,17 +862,33 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
     private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> 
wrapOperatorIntoOutput(
             OneInputStreamOperator<IN, OUT> operator,
             StreamTask<OUT, ?> containingTask,
+            StreamConfig prevOperatorConfig,
             StreamConfig operatorConfig,
             ClassLoader userCodeClassloader,
-            OutputTag<IN> outputTag) {
+            OutputTag<IN> outputTag,
+            boolean shouldAddMetricForPrevOperator) {
+
+        Counter recordsOutCounter = null;
+
+        if (shouldAddMetricForPrevOperator) {
+            recordsOutCounter = getOperatorRecordsOutCounter(containingTask, 
prevOperatorConfig);
+        }
 
         WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
         if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-            currentOperatorOutput = new ChainingOutput<>(operator, outputTag);
+            currentOperatorOutput =
+                    new ChainingOutput<>(
+                            operator, recordsOutCounter, 
operator.getMetricGroup(), outputTag);
         } else {
             TypeSerializer<IN> inSerializer =
                     operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-            currentOperatorOutput = new CopyingChainingOutput<>(operator, 
inSerializer, outputTag);
+            currentOperatorOutput =
+                    new CopyingChainingOutput<>(
+                            operator,
+                            inSerializer,
+                            recordsOutCounter,
+                            operator.getMetricGroup(),
+                            outputTag);
         }
 
         // wrap watermark gauges since registered metrics must be unique
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
index 55da7e7d8c7..c48b0843a7c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
@@ -98,7 +98,7 @@ public class OperatorChainTest {
                 if (op instanceof SetupableStreamOperator) {
                     ((SetupableStreamOperator) op).setup(containingTask, cfg, 
lastWriter);
                 }
-                lastWriter = new ChainingOutput<>(op, null);
+                lastWriter = new ChainingOutput<>(op, null, 
op.getMetricGroup(), null);
 
                 ProcessingTimeService processingTimeService = null;
                 if (op instanceof AbstractStreamOperator) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 38523c18431..51f3d0d2311 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -67,6 +67,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
 import org.apache.flink.runtime.metrics.TimerGauge;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -1907,6 +1908,8 @@ public class StreamTaskTest extends TestLogger {
                         any(CheckpointOptions.class),
                         any(CheckpointStreamFactory.class)))
                 .thenReturn(operatorSnapshotResult);
+        when(operator.getMetricGroup())
+                
.thenReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup());
 
         return operator;
     }
@@ -1923,6 +1926,8 @@ public class StreamTaskTest extends TestLogger {
                         any(CheckpointOptions.class),
                         any(CheckpointStreamFactory.class)))
                 .thenThrow(exception);
+        when(operator.getMetricGroup())
+                
.thenReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup());
 
         return operator;
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index bcec491d82a..734e1400240 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingRes
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -810,7 +811,7 @@ public class SubtaskCheckpointCoordinatorTest {
 
         @Override
         public OperatorMetricGroup getMetricGroup() {
-            return null;
+            return 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
         }
 
         @Override


Reply via email to