This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c45c4a05dd2 [FLINK-30536][runtime] Remove CountingOutput from per-record code path for most operators c45c4a05dd2 is described below commit c45c4a05dd2043e03716b51f5ea76219c73f327b Author: Dong Lin <lindon...@gmail.com> AuthorDate: Mon Jan 9 18:44:29 2023 +0800 [FLINK-30536][runtime] Remove CountingOutput from per-record code path for most operators This closes #21579. --- .../api/operators/AbstractStreamOperator.java | 28 ++--- .../api/operators/AbstractStreamOperatorV2.java | 38 ++----- .../streaming/api/operators/CountingOutput.java | 16 ++- .../runtime/operators/sink/SinkWriterOperator.java | 18 ---- .../streaming/runtime/tasks/ChainingOutput.java | 30 ++---- .../runtime/tasks/CopyingChainingOutput.java | 16 +-- .../streaming/runtime/tasks/OperatorChain.java | 120 ++++++++++++++++++--- .../streaming/runtime/tasks/OperatorChainTest.java | 2 +- .../streaming/runtime/tasks/StreamTaskTest.java | 5 + .../tasks/SubtaskCheckpointCoordinatorTest.java | 3 +- 10 files changed, 155 insertions(+), 121 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a1afe279833..d2a24dee4c7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -159,23 +159,13 @@ public abstract class AbstractStreamOperator<OUT> final Environment environment = containingTask.getEnvironment(); this.container = containingTask; this.config = config; - try { - InternalOperatorMetricGroup operatorMetricGroup = - environment - .getMetricGroup() - .getOrAddOperator(config.getOperatorID(), config.getOperatorName()); - this.output = registerCounterOnOutput(output, operatorMetricGroup); - if (config.isChainEnd()) { - operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); - } - this.metrics = operatorMetricGroup; - } catch (Exception e) { - LOG.warn("An error occurred while instantiating task metrics.", e); - this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); - this.output = output; - } - + this.output = output; + this.metrics = + environment + .getMetricGroup() + .getOrAddOperator(config.getOperatorID(), config.getOperatorName()); this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(2); + try { Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration(); int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE); @@ -646,10 +636,4 @@ public abstract class AbstractStreamOperator<OUT> protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() { return Optional.ofNullable(timeServiceManager); } - - protected Output<StreamRecord<OUT>> registerCounterOnOutput( - Output<StreamRecord<OUT>> output, OperatorMetricGroup operatorMetricGroup) { - return new CountingOutput<>( - output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()); - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 5f82e85016d..a10206ba0e5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -104,50 +104,26 @@ public abstract class AbstractStreamOperatorV2<OUT> public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) { final Environment environment = parameters.getContainingTask().getEnvironment(); config = parameters.getStreamConfig(); - CountingOutput<OUT> countingOutput; - InternalOperatorMetricGroup operatorMetricGroup; - try { - operatorMetricGroup = - environment - .getMetricGroup() - .getOrAddOperator(config.getOperatorID(), config.getOperatorName()); - countingOutput = - new CountingOutput( - parameters.getOutput(), - operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()); - if (config.isChainEnd()) { - operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); - } - } catch (Exception e) { - LOG.warn("An error occurred while instantiating task metrics.", e); - countingOutput = null; - operatorMetricGroup = null; - } - - if (countingOutput == null || operatorMetricGroup == null) { - metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); - output = parameters.getOutput(); - } else { - metrics = operatorMetricGroup; - output = countingOutput; - } - + output = parameters.getOutput(); + metrics = + environment + .getMetricGroup() + .getOrAddOperator(config.getOperatorID(), config.getOperatorName()); latencyStats = createLatencyStats( environment.getTaskManagerInfo().getConfiguration(), parameters.getContainingTask().getIndexInSubtaskGroup()); - processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService()); executionConfig = parameters.getContainingTask().getExecutionConfig(); userCodeClassLoader = parameters.getContainingTask().getUserCodeClassLoader(); cancelables = parameters.getContainingTask().getCancelables(); - this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(numberOfInputs); + combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(numberOfInputs); runtimeContext = new StreamingRuntimeContext( environment, environment.getAccumulatorRegistry().getUserMap(), - operatorMetricGroup, + metrics, getOperatorID(), processingTimeService, null, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java index c87e05439b0..96028c139a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CountingOutput.java @@ -19,20 +19,23 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.OutputTag; /** Wrapping {@link Output} that updates metrics on the number of emitted elements. */ -public class CountingOutput<OUT> implements Output<StreamRecord<OUT>> { - private final Output<StreamRecord<OUT>> output; +public class CountingOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> { + private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> output; private final Counter numRecordsOut; - public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) { + public CountingOutput( + WatermarkGaugeExposingOutput<StreamRecord<OUT>> output, Counter numRecordsOut) { this.output = output; - this.numRecordsOut = counter; + this.numRecordsOut = numRecordsOut; } @Override @@ -66,4 +69,9 @@ public class CountingOutput<OUT> implements Output<StreamRecord<OUT>> { public void close() { output.close(); } + + @Override + public Gauge<Long> getWatermarkGauge() { + return output.getWatermarkGauge(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 9029584f986..e593616abf7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -30,7 +30,6 @@ import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; @@ -42,7 +41,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.api.watermark.Watermark; @@ -218,22 +216,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab closeAll(sinkWriter, super::close); } - /** - * Skip registering numRecordsOut counter on output. - * - * <p>Metric "numRecordsOut" is defined as the total number of records written to the external - * system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the number of - * records sent to downstream operators, which is number of Committable batches sent to - * SinkCommitter. So we skip registering this metric on output and leave this metric to sink - * writer implementations to report. - */ - @Override - protected Output<StreamRecord<CommittableMessage<CommT>>> registerCounterOnOutput( - Output<StreamRecord<CommittableMessage<CommT>>> output, - OperatorMetricGroup operatorMetricGroup) { - return output; - } - private void emit( int indexOfThisSubtask, int numberOfParallelSubtasks, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java index 8ab88a301a2..0bf275b2cd7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java @@ -20,10 +20,8 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.streaming.api.operators.Input; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; @@ -40,33 +38,26 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> private static final Logger LOG = LoggerFactory.getLogger(ChainingOutput.class); protected final Input<T> input; + protected final Counter numRecordsOut; protected final Counter numRecordsIn; protected final WatermarkGauge watermarkGauge = new WatermarkGauge(); @Nullable protected final OutputTag<T> outputTag; protected WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE; - public ChainingOutput(OneInputStreamOperator<T, ?> operator, @Nullable OutputTag<T> outputTag) { - this(operator, operator.getMetricGroup(), outputTag); - } - public ChainingOutput( Input<T> input, - OperatorMetricGroup operatorMetricGroup, + @Nullable Counter prevNumRecordsOut, + OperatorMetricGroup curOperatorMetricGroup, @Nullable OutputTag<T> outputTag) { this.input = input; - - { - Counter tmpNumRecordsIn; - try { - OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup(); - tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); - } catch (Exception e) { - LOG.warn("An exception occurred during the metrics setup.", e); - tmpNumRecordsIn = new SimpleCounter(); - } - numRecordsIn = tmpNumRecordsIn; + if (prevNumRecordsOut != null) { + this.numRecordsOut = prevNumRecordsOut; + } else { + // Uses a dummy counter here to avoid checking the existence of numRecordsOut on the + // per-record path. + this.numRecordsOut = new SimpleCounter(); } - + this.numRecordsIn = curOperatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter(); this.outputTag = outputTag; } @@ -94,6 +85,7 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; + numRecordsOut.inc(); numRecordsIn.inc(); input.setKeyContextElement(castRecord); input.processElement(castRecord); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java index 16c87409736..c9091165ab7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.streaming.api.operators.Input; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.OutputTag; @@ -30,20 +30,13 @@ final class CopyingChainingOutput<T> extends ChainingOutput<T> { private final TypeSerializer<T> serializer; - public CopyingChainingOutput( - OneInputStreamOperator<T, ?> operator, - TypeSerializer<T> serializer, - @Nullable OutputTag<T> outputTag) { - super(operator, outputTag); - this.serializer = serializer; - } - public CopyingChainingOutput( Input<T> input, TypeSerializer<T> serializer, - OperatorMetricGroup operatorMetricGroup, + @Nullable Counter prevRecordsOutCounter, + OperatorMetricGroup curOperatorMetricGroup, @Nullable OutputTag<T> outputTag) { - super(input, operatorMetricGroup, outputTag); + super(input, prevRecordsOutCounter, curOperatorMetricGroup, outputTag); this.serializer = serializer; } @@ -76,6 +69,7 @@ final class CopyingChainingOutput<T> extends ChainingOutput<T> { @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; + numRecordsOut.inc(); numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); input.setKeyContextElement(copy); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index bec2c1d4a08..2321af83976 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -36,6 +37,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; @@ -46,6 +48,7 @@ import org.apache.flink.streaming.api.graph.NonChainedOutput; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.CountingOutput; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -58,6 +61,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; +import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory; import org.apache.flink.util.FlinkException; @@ -199,7 +203,8 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> userCodeClassloader, recordWriterOutputs, allOpWrappers, - containingTask.getMailboxExecutorFactory()); + containingTask.getMailboxExecutorFactory(), + operatorFactory != null); if (operatorFactory != null) { Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService = @@ -622,6 +627,44 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> return chainedSourceInputs; } + /** + * Get the numRecordsOut counter for the operator represented by the given config. And re-use + * the operator-level counter for the task-level numRecordsOut counter if this operator is at + * the end of the operator chain. + * + * <p>Return null if we should not use the numRecordsOut counter to track the records emitted by + * this operator. + */ + @Nullable + private Counter getOperatorRecordsOutCounter( + StreamTask<?, ?> containingTask, StreamConfig operatorConfig) { + ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); + StreamOperatorFactory<?> operatorFactory = + operatorConfig.getStreamOperatorFactory(userCodeClassloader); + // Do not use the numRecordsOut counter on output if this operator is SinkWriterOperator. + // + // Metric "numRecordsOut" is defined as the total number of records written to the + // external system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the + // number of records sent to downstream operators, which is number of Committable batches + // sent to SinkCommitter. So we skip registering this metric on output and leave this metric + // to sink writer implementations to report. + if (operatorFactory instanceof SinkWriterOperatorFactory) { + return null; + } + + InternalOperatorMetricGroup operatorMetricGroup = + containingTask + .getEnvironment() + .getMetricGroup() + .getOrAddOperator( + operatorConfig.getOperatorID(), operatorConfig.getOperatorName()); + if (operatorConfig.isChainEnd()) { + operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); + } + + return operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + } + @SuppressWarnings({"rawtypes", "unchecked"}) private WatermarkGaugeExposingOutput<StreamRecord> createChainedSourceOutput( StreamTask<?, OP> containingTask, @@ -631,14 +674,18 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> OperatorMetricGroup metricGroup, OutputTag outputTag) { + Counter recordsOutCounter = getOperatorRecordsOutCounter(containingTask, sourceInputConfig); + WatermarkGaugeExposingOutput<StreamRecord> chainedSourceOutput; if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - chainedSourceOutput = new ChainingOutput(input, metricGroup, outputTag); + chainedSourceOutput = + new ChainingOutput(input, recordsOutCounter, metricGroup, outputTag); } else { TypeSerializer<?> inSerializer = sourceInputConfig.getTypeSerializerOut(userCodeClassloader); chainedSourceOutput = - new CopyingChainingOutput(input, inSerializer, metricGroup, outputTag); + new CopyingChainingOutput( + input, inSerializer, recordsOutCounter, metricGroup, outputTag); } /** * Chained sources are closed when {@link @@ -654,7 +701,8 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> ClassLoader userCodeClassloader, Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs, List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, - MailboxExecutorFactory mailboxExecutorFactory) { + MailboxExecutorFactory mailboxExecutorFactory, + boolean shouldAddMetric) { List<WatermarkGaugeExposingOutput<StreamRecord<T>>> allOutputs = new ArrayList<>(4); // create collectors for the network outputs @@ -675,18 +723,26 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> WatermarkGaugeExposingOutput<StreamRecord<T>> output = createOperatorChain( containingTask, + operatorConfig, chainedOpConfig, chainedConfigs, userCodeClassloader, recordWriterOutputs, allOperatorWrappers, outputEdge.getOutputTag(), - mailboxExecutorFactory); + mailboxExecutorFactory, + shouldAddMetric); allOutputs.add(output); + // If the operator has multiple downstream chained operators, only one of them should + // increment the recordsOutCounter for this operator. Set shouldAddMetric to false + // so that we would skip adding the counter to other downstream operators. + shouldAddMetric = false; } + WatermarkGaugeExposingOutput<StreamRecord<T>> result; + if (allOutputs.size() == 1) { - return allOutputs.get(0); + result = allOutputs.get(0); } else { // send to N outputs. Note that this includes the special case // of sending to zero outputs @@ -700,11 +756,22 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> // If the chaining output does not copy we need to copy in the broadcast output, // otherwise multi-chaining would not work correctly. if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return closer.register(new CopyingBroadcastingOutputCollector<>(asArray)); + result = closer.register(new CopyingBroadcastingOutputCollector<>(asArray)); } else { - return closer.register(new BroadcastingOutputCollector<>(asArray)); + result = closer.register(new BroadcastingOutputCollector<>(asArray)); + } + } + + if (shouldAddMetric) { + // Create a CountingOutput to increment the recordsOutCounter for this operator + // if we have not added the counter to any downstream chained operator. + Counter recordsOutCounter = + getOperatorRecordsOutCounter(containingTask, operatorConfig); + if (recordsOutCounter != null) { + result = new CountingOutput<>(result, recordsOutCounter); } } + return result; } /** @@ -713,13 +780,15 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> */ private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain( StreamTask<OUT, ?> containingTask, + StreamConfig prevOperatorConfig, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs, List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, OutputTag<IN> outputTag, - MailboxExecutorFactory mailboxExecutorFactory) { + MailboxExecutorFactory mailboxExecutorFactory, + boolean shouldAddMetricForPrevOperator) { // create the output that the operator writes to first. this may recursively create more // operators WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = @@ -730,7 +799,8 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> userCodeClassloader, recordWriterOutputs, allOperatorWrappers, - mailboxExecutorFactory); + mailboxExecutorFactory, + true); OneInputStreamOperator<IN, OUT> chainedOperator = createOperator( @@ -742,7 +812,13 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> false); return wrapOperatorIntoOutput( - chainedOperator, containingTask, operatorConfig, userCodeClassloader, outputTag); + chainedOperator, + containingTask, + prevOperatorConfig, + operatorConfig, + userCodeClassloader, + outputTag, + shouldAddMetricForPrevOperator); } /** @@ -786,17 +862,33 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> wrapOperatorIntoOutput( OneInputStreamOperator<IN, OUT> operator, StreamTask<OUT, ?> containingTask, + StreamConfig prevOperatorConfig, StreamConfig operatorConfig, ClassLoader userCodeClassloader, - OutputTag<IN> outputTag) { + OutputTag<IN> outputTag, + boolean shouldAddMetricForPrevOperator) { + + Counter recordsOutCounter = null; + + if (shouldAddMetricForPrevOperator) { + recordsOutCounter = getOperatorRecordsOutCounter(containingTask, prevOperatorConfig); + } WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput; if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - currentOperatorOutput = new ChainingOutput<>(operator, outputTag); + currentOperatorOutput = + new ChainingOutput<>( + operator, recordsOutCounter, operator.getMetricGroup(), outputTag); } else { TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - currentOperatorOutput = new CopyingChainingOutput<>(operator, inSerializer, outputTag); + currentOperatorOutput = + new CopyingChainingOutput<>( + operator, + inSerializer, + recordsOutCounter, + operator.getMetricGroup(), + outputTag); } // wrap watermark gauges since registered metrics must be unique diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java index 55da7e7d8c7..c48b0843a7c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java @@ -98,7 +98,7 @@ public class OperatorChainTest { if (op instanceof SetupableStreamOperator) { ((SetupableStreamOperator) op).setup(containingTask, cfg, lastWriter); } - lastWriter = new ChainingOutput<>(op, null); + lastWriter = new ChainingOutput<>(op, null, op.getMetricGroup(), null); ProcessingTimeService processingTimeService = null; if (op instanceof AbstractStreamOperator) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 38523c18431..51f3d0d2311 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -67,6 +67,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; import org.apache.flink.runtime.metrics.TimerGauge; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -1907,6 +1908,8 @@ public class StreamTaskTest extends TestLogger { any(CheckpointOptions.class), any(CheckpointStreamFactory.class))) .thenReturn(operatorSnapshotResult); + when(operator.getMetricGroup()) + .thenReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()); return operator; } @@ -1923,6 +1926,8 @@ public class StreamTaskTest extends TestLogger { any(CheckpointOptions.class), any(CheckpointStreamFactory.class))) .thenThrow(exception); + when(operator.getMetricGroup()) + .thenReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()); return operator; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index bcec491d82a..734e1400240 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingRes import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; @@ -810,7 +811,7 @@ public class SubtaskCheckpointCoordinatorTest { @Override public OperatorMetricGroup getMetricGroup() { - return null; + return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); } @Override