This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLINK-18808-record-out in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9be2171a8fbeac72e34def779e1003bf099ab802 Author: Weijie Guo <res...@163.com> AuthorDate: Wed Apr 26 19:39:51 2023 +0800 [FLINK-18808][streaming] Include side outputs in numRecordsOut metric Co-authored-by: 李明 <limin...@meituan.com> --- .../streaming/runtime/io/RecordWriterOutput.java | 18 ++- .../runtime/tasks/BroadcastingOutputCollector.java | 24 +++- .../tasks/CopyingBroadcastingOutputCollector.java | 41 ++++--- .../streaming/runtime/tasks/OperatorChain.java | 39 ++++-- .../runtime/tasks/MultipleInputStreamTaskTest.java | 131 +++++++++++++++++++++ .../runtime/tasks/OneInputStreamTaskTest.java | 88 ++++++++++++++ .../streaming/runtime/tasks/OperatorChainTest.java | 5 +- .../runtime/tasks/TwoInputStreamTaskTest.java | 103 ++++++++++++++++ 8 files changed, 415 insertions(+), 34 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 46fc01231be..6d1ae671153 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -19,7 +19,9 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -60,6 +62,10 @@ public class RecordWriterOutput<OUT> private WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE; + // Uses a dummy counter here to avoid checking the existence of numRecordsOut on the + // per-record path. + private Counter numRecordsOut = new SimpleCounter(); + @SuppressWarnings("unchecked") public RecordWriterOutput( RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, @@ -86,12 +92,16 @@ public class RecordWriterOutput<OUT> @Override public void collect(StreamRecord<OUT> record) { - collectAndCheckIfCountNeeded(record); + if (collectAndCheckIfCountNeeded(record)) { + numRecordsOut.inc(); + } } @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { - collectAndCheckIfCountNeeded(outputTag, record); + if (collectAndCheckIfCountNeeded(outputTag, record)) { + numRecordsOut.inc(); + } } @Override @@ -168,6 +178,10 @@ public class RecordWriterOutput<OUT> } } + public void setNumRecordsOut(Counter numRecordsOut) { + this.numRecordsOut = checkNotNull(numRecordsOut); + } + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { if (isPriorityEvent && event instanceof CheckpointBarrier diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java index 344215f0e51..2ddc8781eb7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; @@ -31,12 +32,15 @@ import java.util.Random; class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> { - protected final Output<StreamRecord<T>>[] outputs; + protected final OutputWithChainingCheck<StreamRecord<T>>[] outputs; private final Random random = new XORShiftRandom(); private final WatermarkGauge watermarkGauge = new WatermarkGauge(); + protected final Counter numRecordsOutForTask; - public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) { + public BroadcastingOutputCollector( + OutputWithChainingCheck<StreamRecord<T>>[] outputs, Counter numRecordsOutForTask) { this.outputs = outputs; + this.numRecordsOutForTask = numRecordsOutForTask; } @Override @@ -73,15 +77,23 @@ class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<Str @Override public void collect(StreamRecord<T> record) { - for (Output<StreamRecord<T>> output : outputs) { - output.collect(record); + boolean emitted = false; + for (OutputWithChainingCheck<StreamRecord<T>> output : outputs) { + emitted |= output.collectAndCheckIfCountNeeded(record); + } + if (emitted) { + numRecordsOutForTask.inc(); } } @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { - for (Output<StreamRecord<T>> output : outputs) { - output.collect(outputTag, record); + boolean emitted = false; + for (OutputWithChainingCheck<StreamRecord<T>> output : outputs) { + emitted |= output.collectAndCheckIfCountNeeded(outputTag, record); + } + if (emitted) { + numRecordsOutForTask.inc(); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java index 9e179b4c464..a70fde3901d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingBroadcastingOutputCollector.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.OutputTag; @@ -27,37 +27,48 @@ import org.apache.flink.util.OutputTag; */ final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> { - public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) { - super(outputs); + public CopyingBroadcastingOutputCollector( + OutputWithChainingCheck<StreamRecord<T>>[] allOutputs, Counter numRecordsOutForTask) { + super(allOutputs, numRecordsOutForTask); } @Override public void collect(StreamRecord<T> record) { + boolean emitted = false; + int length = outputs.length; - for (int i = 0; i < outputs.length - 1; i++) { - Output<StreamRecord<T>> output = outputs[i]; + for (int i = 0; i < length - 1; i++) { + OutputWithChainingCheck<StreamRecord<T>> output = outputs[i]; StreamRecord<T> shallowCopy = record.copy(record.getValue()); - output.collect(shallowCopy); + emitted |= output.collectAndCheckIfCountNeeded(shallowCopy); } - if (outputs.length > 0) { - // don't copy for the last output - outputs[outputs.length - 1].collect(record); + if (length > 0) { + emitted |= outputs[length - 1].collectAndCheckIfCountNeeded(record); + } + + if (emitted) { + numRecordsOutForTask.inc(); } } @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { - for (int i = 0; i < outputs.length - 1; i++) { - Output<StreamRecord<T>> output = outputs[i]; + boolean emitted = false; + int length = outputs.length; + for (int i = 0; i < length - 1; i++) { + OutputWithChainingCheck<StreamRecord<T>> output = outputs[i]; StreamRecord<X> shallowCopy = record.copy(record.getValue()); - output.collect(outputTag, shallowCopy); + emitted |= output.collectAndCheckIfCountNeeded(outputTag, shallowCopy); + } + + if (length > 0) { + emitted |= outputs[length - 1].collectAndCheckIfCountNeeded(outputTag, record); } - if (outputs.length > 0) { - // don't copy for the last output - outputs[outputs.length - 1].collect(outputTag, record); + if (emitted) { + numRecordsOutForTask.inc(); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 2321af83976..0f27f2a06d5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; @@ -658,9 +660,6 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> .getMetricGroup() .getOrAddOperator( operatorConfig.getOperatorID(), operatorConfig.getOperatorName()); - if (operatorConfig.isChainEnd()) { - operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); - } return operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter(); } @@ -703,7 +702,7 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, MailboxExecutorFactory mailboxExecutorFactory, boolean shouldAddMetric) { - List<WatermarkGaugeExposingOutput<StreamRecord<T>>> allOutputs = new ArrayList<>(4); + List<OutputWithChainingCheck<StreamRecord<T>>> allOutputs = new ArrayList<>(4); // create collectors for the network outputs for (NonChainedOutput streamOutput : @@ -732,7 +731,8 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> outputEdge.getOutputTag(), mailboxExecutorFactory, shouldAddMetric); - allOutputs.add(output); + checkState(output instanceof OutputWithChainingCheck); + allOutputs.add((OutputWithChainingCheck) output); // If the operator has multiple downstream chained operators, only one of them should // increment the recordsOutCounter for this operator. Set shouldAddMetric to false // so that we would skip adding the counter to other downstream operators. @@ -743,22 +743,35 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> if (allOutputs.size() == 1) { result = allOutputs.get(0); + // only if this is a single RecordWriterOutput, reuse its numRecordOut for task. + if (result instanceof RecordWriterOutput) { + Counter numRecordsOutCounter = createNumRecordsOutCounter(containingTask); + ((RecordWriterOutput<T>) result).setNumRecordsOut(numRecordsOutCounter); + } } else { // send to N outputs. Note that this includes the special case // of sending to zero outputs @SuppressWarnings({"unchecked"}) - Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()]; + OutputWithChainingCheck<StreamRecord<T>>[] allOutputsArray = + new OutputWithChainingCheck[allOutputs.size()]; for (int i = 0; i < allOutputs.size(); i++) { - asArray[i] = allOutputs.get(i); + allOutputsArray[i] = allOutputs.get(i); } // This is the inverse of creating the normal ChainingOutput. // If the chaining output does not copy we need to copy in the broadcast output, // otherwise multi-chaining would not work correctly. + Counter numRecordsOutForTask = createNumRecordsOutCounter(containingTask); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - result = closer.register(new CopyingBroadcastingOutputCollector<>(asArray)); + result = + closer.register( + new CopyingBroadcastingOutputCollector<>( + allOutputsArray, numRecordsOutForTask)); } else { - result = closer.register(new BroadcastingOutputCollector<>(asArray)); + result = + closer.register( + new BroadcastingOutputCollector<>( + allOutputsArray, numRecordsOutForTask)); } } @@ -774,6 +787,14 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> return result; } + private static Counter createNumRecordsOutCounter(StreamTask<?, ?> containingTask) { + TaskIOMetricGroup taskIOMetricGroup = + containingTask.getEnvironment().getMetricGroup().getIOMetricGroup(); + Counter counter = new SimpleCounter(); + taskIOMetricGroup.reuseRecordsOutputCounter(counter); + return counter; + } + /** * Recursively create chain of operators that starts from the given {@param operatorConfig}. * Operators are created tail to head and wrapped into an {@link WatermarkGaugeExposingOutput}. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 8c46fd5fb92..c458b598095 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -56,6 +56,7 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfData; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.StopMode; +import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper; import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; @@ -84,12 +85,14 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskChainedSourcesCheckpointingTest.LifeCycleMonitorMultipleInputOperatorFactory; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.WatermarkMetricOperator; @@ -98,6 +101,7 @@ import org.apache.flink.streaming.util.CompletingCheckpointResponder; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.testutils.junit.SharedObjects; import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.util.OutputTag; import org.apache.flink.util.SerializedValue; import org.hamcrest.collection.IsMapContaining; @@ -1391,6 +1395,133 @@ public class MultipleInputStreamTaskTest { new SerializedValue<>(new NoMoreSplitsEvent())); } + @Test + public void testTaskSideOutputStatistics() throws Exception { + TaskMetricGroup taskMetricGroup = + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + + ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3]; + for (int i = 0; i < partitionWriters.length; ++i) { + partitionWriters[i] = + new RecordOrEventCollectingResultPartitionWriter<>( + new ArrayDeque<>(), + new StreamElementSerializer<>( + BasicTypeInfo.INT_TYPE_INFO.createSerializer( + new ExecutionConfig()))); + partitionWriters[i].setup(); + } + + try (StreamTaskMailboxTestHarness<Integer> testHarness = + new StreamTaskMailboxTestHarnessBuilder<>( + MultipleInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .modifyExecutionConfig(applyObjectReuse(objectReuse)) + .addInput(BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO) + .addAdditionalOutput(partitionWriters) + .setupOperatorChain(new OddEvenOperatorFactory()) + .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) + .setOperatorFactory( + SimpleOperatorFactory.of( + new OneInputStreamTaskTest.OddEvenOperator())) + .addNonChainedOutputsCount( + new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2) + .addNonChainedOutputsCount(1) + .build() + .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) + .setOperatorFactory( + SimpleOperatorFactory.of( + new OneInputStreamTaskTest.DuplicatingOperator())) + .addNonChainedOutputsCount(1) + .build() + .finish() + .setTaskMetricGroup(taskMetricGroup) + .build()) { + Counter numRecordsInCounter = + taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter(); + Counter numRecordsOutCounter = + taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + + int numOddRecords = 5; + int numEvenRecords = 3; + int numNaturalRecords = 2; + for (int x = 0; x < numOddRecords; x++) { + testHarness.processElement(new StreamRecord<>(x * 2 + 1)); + } + for (int x = 0; x < numEvenRecords; x++) { + testHarness.processElement(new StreamRecord<>(x * 2)); + } + for (int x = 0; x < numNaturalRecords; x++) { + testHarness.processElement(new StreamRecord<>(x)); + } + + int totalOddRecords = numOddRecords + numNaturalRecords / 2; + int totalEvenRecords = numEvenRecords + (int) Math.ceil(numNaturalRecords / 2.0); + assertEquals(totalOddRecords + totalEvenRecords, numRecordsInCounter.getCount()); + assertEquals( + totalOddRecords + + (totalOddRecords + totalEvenRecords) + + (totalOddRecords + totalEvenRecords) * 2, + numRecordsOutCounter.getCount()); + testHarness.waitForTaskCompletion(); + } finally { + for (ResultPartitionWriter partitionWriter : partitionWriters) { + partitionWriter.close(); + } + } + } + + static class OddEvenOperator extends AbstractStreamOperatorV2<Integer> + implements MultipleInputStreamOperator<Integer> { + + public OddEvenOperator(StreamOperatorParameters<Integer> parameters) { + super(parameters, 3); + } + + @Override + public List<Input> getInputs() { + return Arrays.asList( + new OddEvenInput(this, 1), + new OddEvenInput(this, 2), + new OddEvenInput(this, 3)); + } + + static class OddEvenInput extends AbstractInput<Integer, Integer> { + private final OutputTag<Integer> oddOutputTag = + new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO); + private final OutputTag<Integer> evenOutputTag = + new OutputTag<>("even", BasicTypeInfo.INT_TYPE_INFO); + + public OddEvenInput(AbstractStreamOperatorV2<Integer> owner, int inputId) { + super(owner, inputId); + } + + @Override + public void processElement(StreamRecord<Integer> element) throws Exception { + if (element.getValue() % 2 == 0) { + output.collect(evenOutputTag, element); + } else { + output.collect(oddOutputTag, element); + } + output.collect(element); + } + } + } + + private static class OddEvenOperatorFactory extends AbstractStreamOperatorFactory<Integer> { + @Override + public <T extends StreamOperator<Integer>> T createStreamOperator( + StreamOperatorParameters<Integer> parameters) { + return (T) new OddEvenOperator(parameters); + } + + @Override + public Class<? extends StreamOperator<Integer>> getStreamOperatorClass( + ClassLoader classLoader) { + return OddEvenOperator.class; + } + } + static class LifeCycleTrackingMapToStringMultipleInputOperator extends MapToStringMultipleInputOperator implements BoundedMultiInput { public static final String OPEN = "MultipleInputOperator#open"; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index f8e60522b31..691718adf78 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -40,6 +40,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; @@ -48,6 +50,7 @@ import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; @@ -59,13 +62,16 @@ import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -996,6 +1002,88 @@ public class OneInputStreamTaskTest extends TestLogger { } } + @Test + public void testTaskSideOutputStatistics() throws Exception { + TaskMetricGroup taskMetricGroup = + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + + ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3]; + for (int i = 0; i < partitionWriters.length; ++i) { + partitionWriters[i] = + new RecordOrEventCollectingResultPartitionWriter<>( + new ArrayDeque<>(), + new StreamElementSerializer<>( + BasicTypeInfo.INT_TYPE_INFO.createSerializer( + new ExecutionConfig()))); + partitionWriters[i].setup(); + } + + try (StreamTaskMailboxTestHarness<Integer> testHarness = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO) + .addAdditionalOutput(partitionWriters) + .setupOperatorChain(new OperatorID(), new OddEvenOperator()) + .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) + .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator())) + .addNonChainedOutputsCount( + new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2) + .addNonChainedOutputsCount(1) + .build() + .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) + .setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator())) + .addNonChainedOutputsCount(1) + .build() + .finish() + .setTaskMetricGroup(taskMetricGroup) + .build()) { + Counter numRecordsInCounter = + taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter(); + Counter numRecordsOutCounter = + taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + + final int numEvenRecords = 5; + final int numOddRecords = 3; + + for (int x = 0; x < numEvenRecords; x++) { + testHarness.processElement(new StreamRecord<>(2 * x)); + } + + for (int x = 0; x < numOddRecords; x++) { + testHarness.processElement(new StreamRecord<>(2 * x + 1)); + } + assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount()); + assertEquals( + numOddRecords + + (numOddRecords + numEvenRecords) + + (numOddRecords + numEvenRecords) * 2, + numRecordsOutCounter.getCount()); + testHarness.waitForTaskCompletion(); + } finally { + for (ResultPartitionWriter partitionWriter : partitionWriters) { + partitionWriter.close(); + } + } + } + + static class OddEvenOperator extends AbstractStreamOperator<Integer> + implements OneInputStreamOperator<Integer, Integer> { + private final OutputTag<Integer> oddOutputTag = + new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO); + private final OutputTag<Integer> evenOutputTag = + new OutputTag<>("even", BasicTypeInfo.INT_TYPE_INFO); + + @Override + public void processElement(StreamRecord<Integer> element) { + if (element.getValue() % 2 == 0) { + output.collect(evenOutputTag, element); + } else { + output.collect(oddOutputTag, element); + } + output.collect(element); + } + } + static class WatermarkMetricOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java index c48b0843a7c..27ac625cf63 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java @@ -20,12 +20,12 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.SetupableStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; @@ -89,7 +89,8 @@ public class OperatorChainTest { // initial output goes to nowhere @SuppressWarnings({"unchecked", "rawtypes"}) WatermarkGaugeExposingOutput<StreamRecord<T>> lastWriter = - new BroadcastingOutputCollector<>(new Output[0]); + new BroadcastingOutputCollector<>( + new OutputWithChainingCheck[0], new SimpleCounter()); // build the reverse operators array for (int i = 0; i < operators.length; i++) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 9d215a4b1a6..76130881c57 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -35,6 +35,8 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfData; import org.apache.flink.runtime.io.network.api.StopMode; +import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; @@ -43,6 +45,7 @@ import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; @@ -53,17 +56,21 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.OutputTag; import org.hamcrest.collection.IsMapContaining; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -825,6 +832,102 @@ public class TwoInputStreamTaskTest { } } + @Test + public void testTaskSideOutputStatistics() throws Exception { + TaskMetricGroup taskMetricGroup = + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + + ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3]; + for (int i = 0; i < partitionWriters.length; ++i) { + partitionWriters[i] = + new RecordOrEventCollectingResultPartitionWriter<>( + new ArrayDeque<>(), + new StreamElementSerializer<>( + BasicTypeInfo.INT_TYPE_INFO.createSerializer( + new ExecutionConfig()))); + partitionWriters[i].setup(); + } + + try (StreamTaskMailboxTestHarness<Integer> testHarness = + new StreamTaskMailboxTestHarnessBuilder<>( + TwoInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO) + .addAdditionalOutput(partitionWriters) + .setupOperatorChain(new OperatorID(), new OddEvenOperator()) + .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) + .setOperatorFactory( + SimpleOperatorFactory.of( + new OneInputStreamTaskTest.OddEvenOperator())) + .addNonChainedOutputsCount( + new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2) + .addNonChainedOutputsCount(1) + .build() + .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) + .setOperatorFactory( + SimpleOperatorFactory.of( + new OneInputStreamTaskTest.DuplicatingOperator())) + .addNonChainedOutputsCount(1) + .build() + .finish() + .setTaskMetricGroup(taskMetricGroup) + .build()) { + Counter numRecordsInCounter = + taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter(); + Counter numRecordsOutCounter = + taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + + final int numEvenRecords = 5; + final int numOddRecords = 3; + + for (int x = 0; x < numEvenRecords; x++) { + testHarness.processElement(new StreamRecord<>(2 * x)); + } + + for (int x = 0; x < numOddRecords; x++) { + testHarness.processElement(new StreamRecord<>(2 * x + 1)); + } + assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount()); + assertEquals( + numOddRecords + + (numOddRecords + numEvenRecords) + + (numOddRecords + numEvenRecords) * 2, + numRecordsOutCounter.getCount()); + testHarness.waitForTaskCompletion(); + } finally { + for (ResultPartitionWriter partitionWriter : partitionWriters) { + partitionWriter.close(); + } + } + } + + static class OddEvenOperator extends AbstractStreamOperator<Integer> + implements TwoInputStreamOperator<Integer, Integer, Integer> { + private final OutputTag<Integer> oddOutputTag = + new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO); + private final OutputTag<Integer> evenOutputTag = + new OutputTag<>("even", BasicTypeInfo.INT_TYPE_INFO); + + @Override + public void processElement1(StreamRecord<Integer> element) throws Exception { + processElement(element); + } + + @Override + public void processElement2(StreamRecord<Integer> element) throws Exception { + processElement(element); + } + + private void processElement(StreamRecord<Integer> element) { + if (element.getValue() % 2 == 0) { + output.collect(evenOutputTag, element); + } else { + output.collect(oddOutputTag, element); + } + output.collect(element); + } + } + // This must only be used in one test, otherwise the static fields will be changed // by several tests concurrently private static class TestOpenCloseMapFunction