rkhachatryan commented on a change in pull request #13466: URL: https://github.com/apache/flink/pull/13466#discussion_r494815993
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -68,7 +69,9 @@ */ private final StreamStatus[] streamStatuses; - private final Counter numRecordsIn; + private final Counter networkRecordsIn = new SimpleCounter(); + + private final Counter mainOperatorRecordsIn; Review comment: Can you elaborate on this change in the commit message please? `[FLINK-18907][task] Fix numRecordsIn metric with chained sources` ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java ########## @@ -726,15 +803,188 @@ public void processElement(StreamRecord<T> element) throws Exception { * Factory for {@link MapToStringMultipleInputOperator}. */ protected static class MapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> { + private final int numberOfInputs; + + public MapToStringMultipleInputOperatorFactory(int numberOfInputs) { + this.numberOfInputs = numberOfInputs; + } + @Override public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) { - return (T) new MapToStringMultipleInputOperator(parameters); + return (T) new MapToStringMultipleInputOperator(parameters, numberOfInputs); } @Override public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) { return MapToStringMultipleInputOperator.class; } } + + static StreamTaskMailboxTestHarness<String> buildTestHarness() throws Exception { + return buildTestHarness(false); + } + + static StreamTaskMailboxTestHarness<String> buildTestHarness(boolean unaligned) throws Exception { + return new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .modifyExecutionConfig(config -> config.enableObjectReuse()) + .modifyStreamConfig(config -> config.setUnalignedCheckpointsEnabled(unaligned)) + .addInput(BasicTypeInfo.STRING_TYPE_INFO) + .addSourceInput( + new SourceOperatorFactory<>( + new MockSource(Boundedness.BOUNDED, 1), + WatermarkStrategy.noWatermarks())) + .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO) + .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory(3)) + .build(); + } + + static void addSourceRecords( + StreamTaskMailboxTestHarness<String> testHarness, + int sourceId, + int... records) throws Exception { + OperatorID sourceOperatorID = getSourceOperatorID(testHarness, sourceId); + + // Prepare the source split and assign it to the source reader. + MockSourceSplit split = new MockSourceSplit(0, 0, records.length); + for (int record : records) { + split.addRecord(record); + } + + // Assign the split to the source reader. + AddSplitEvent<MockSourceSplit> addSplitEvent = + new AddSplitEvent<>(Collections.singletonList(split), new MockSourceSplitSerializer()); + + testHarness.getStreamTask().dispatchOperatorEvent( + sourceOperatorID, + new SerializedValue<>(addSplitEvent)); + } + + private static OperatorID getSourceOperatorID(StreamTaskMailboxTestHarness<String> testHarness, int sourceId) { + StreamConfig.InputConfig[] inputs = testHarness.getStreamTask().getConfiguration().getInputs(testHarness.getClass().getClassLoader()); + StreamConfig.SourceInputConfig input = (StreamConfig.SourceInputConfig) inputs[sourceId]; + return testHarness.getStreamTask().operatorChain.getSourceTaskInput(input).getOperatorID(); + } + + private void finishAddingRecords(StreamTaskMailboxTestHarness<String> testHarness, int sourceId) throws Exception { + testHarness.getStreamTask().dispatchOperatorEvent( + getSourceOperatorID(testHarness, sourceId), + new SerializedValue<>(new SourceEventWrapper(new MockNoMoreSplitsEvent()))); + } + + static class LifeCycleTrackingMapToStringMultipleInputOperator + extends MapToStringMultipleInputOperator implements BoundedMultiInput { + public static final String OPEN = "MultipleInputOperator#open"; + public static final String CLOSE = "MultipleInputOperator#close"; + public static final String END_INPUT = "MultipleInputOperator#endInput"; + + private static final long serialVersionUID = 1L; + + public LifeCycleTrackingMapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters) { + super(parameters, 3); + } + + @Override + public void open() throws Exception { + LIFE_CYCLE_EVENTS.add(OPEN); + super.open(); + } + + @Override + public void close() throws Exception { + LIFE_CYCLE_EVENTS.add(CLOSE); + super.close(); + } + + @Override + public void endInput(int inputId) { + LIFE_CYCLE_EVENTS.add(END_INPUT); + } + } + + static class LifeCycleTrackingMapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> { + @Override + public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) { + return (T) new LifeCycleTrackingMapToStringMultipleInputOperator(parameters); + } + + @Override + public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) { + return LifeCycleTrackingMapToStringMultipleInputOperator.class; + } + } + + static class LifeCycleTrackingMockSource extends MockSource { + public LifeCycleTrackingMockSource(Boundedness boundedness, int numSplits) { + super(boundedness, numSplits); + } + + @Override + public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) { + LifeCycleTrackingMockSourceReader sourceReader = new LifeCycleTrackingMockSourceReader(); + createdReaders.add(sourceReader); + return sourceReader; + } + } + + static class LifeCycleTrackingMockSourceReader extends MockSourceReader { + public static final String START = "SourceReader#start"; + public static final String CLOSE = "SourceReader#close"; + + @Override + public void start() { + LIFE_CYCLE_EVENTS.add(START); + super.start(); + } + + @Override + public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception { + return super.pollNext(sourceOutput); Review comment: Why do we need this override? ########## File path: flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java ########## @@ -68,7 +78,16 @@ public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception return InputStatus.MORE_AVAILABLE; } else { // In case no split has available record, return depending on whether all the splits has finished. - return finished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE; + if (finished) { Review comment: nit: flatten nested `else { if {} }` to `else if {}`? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java ########## @@ -210,45 +233,63 @@ public void testOvertakingCheckpointBarriers() throws Exception { } @Test - public void testOperatorMetricReuse() throws Exception { + public void testMetrics() throws Exception { + + HashMap<String, OperatorMetricGroup> operatorMetrics = new HashMap<>(); TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { @Override public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) { - return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name); + OperatorMetricGroup operatorMetricGroup = new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name); + operatorMetrics.put(name, operatorMetricGroup); + return operatorMetricGroup; } }; + String mainOperatorName = "MainOperator"; try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .modifyExecutionConfig(config -> config.enableObjectReuse()) .addInput(BasicTypeInfo.STRING_TYPE_INFO) + .addSourceInput( + new SourceOperatorFactory<>( + new LifeCycleTrackingMockSource(Boundedness.BOUNDED, 1), + WatermarkStrategy.noWatermarks())) .addInput(BasicTypeInfo.STRING_TYPE_INFO) - .addInput(BasicTypeInfo.STRING_TYPE_INFO) - .setupOperatorChain(new DuplicatingOperatorFactory()) + .setupOperatorChain(new MapToStringMultipleInputOperatorFactory(3)) + .name(mainOperatorName) + .chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) .chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) .chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) .finish() .setTaskMetricGroup(taskMetricGroup) .build()) { + + assertTrue(operatorMetrics.containsKey(mainOperatorName)); + OperatorMetricGroup mainOperatorMetrics = operatorMetrics.get(mainOperatorName); Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter(); Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter(); int numRecords1 = 5; int numRecords2 = 3; int numRecords3 = 2; + // add source splits before processing any elements, so the MockSourceReader does not end prematurely + for (int x = 0; x < numRecords2; x++) { + addSourceRecords(testHarness, 1, 42); + } for (int x = 0; x < numRecords1; x++) { testHarness.processElement(new StreamRecord<>("hello"), 0, 0); } - for (int x = 0; x < numRecords2; x++) { - testHarness.processElement(new StreamRecord<>("hello"), 1, 0); - } for (int x = 0; x < numRecords3; x++) { - testHarness.processElement(new StreamRecord<>("hello"), 2, 0); + testHarness.processElement(new StreamRecord<>("hello"), 1, 0); } - int totalRecords = numRecords1 + numRecords2 + numRecords3; - assertEquals(totalRecords, numRecordsInCounter.getCount()); - assertEquals((totalRecords) * 2 * 2 * 2, numRecordsOutCounter.getCount()); + int networkRecordsIn = numRecords1 + numRecords3; + int mainOpeartorRecordsIn = networkRecordsIn + numRecords2; + int totalRecordsOut = (networkRecordsIn + numRecords2) * 2 * 2 * 2; Review comment: Can you add a comment explaining this line? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java ########## @@ -63,13 +66,22 @@ public void init() { public static class AsyncDataOutputToOutput<T> extends AbstractDataOutput<T> { private final Output<StreamRecord<T>> output; + @Nullable private final WatermarkGauge inputWatermarkGauge; public AsyncDataOutputToOutput( Output<StreamRecord<T>> output, StreamStatusMaintainer streamStatusMaintainer) { + this(output, streamStatusMaintainer, null); Review comment: How about inlining this constructor (and maybe commenting at call site) to make it clear when (and why) there is no `inputWatermarkGauge`. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java ########## @@ -417,20 +451,22 @@ public void testWatermark() throws Exception { // advance watermark from one of the inputs, now we should get a new one since the // minimum increases - testHarness.processElement(new Watermark(initialTime + 4), 2, 1); + testHarness.processElement(new Watermark(initialTime + 4), 1, 1); expectedOutput.add(new Watermark(initialTime + 3)); assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); - // advance the other two inputs, now we should get a new one since the - // minimum increases again + // advance the other inputs, now we should get a new one since the minimum increases again testHarness.processElement(new Watermark(initialTime + 4), 0, 1); + + addSourceRecords(testHarness, 1, initialTime + 4); + expectedOutput.add(new StreamRecord<>("" + (initialTime + 4), TimestampAssigner.NO_TIMESTAMP)); + testHarness.processElement(new Watermark(initialTime + 4), 1, 0); - testHarness.processElement(new Watermark(initialTime + 4), 2, 0); expectedOutput.add(new Watermark(initialTime + 4)); assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); - assertEquals(2, resultElements.size()); + assertEquals(5, resultElements.size()); Review comment: The method is ~70 LOC now with 7 assertions and a bit generic name `testWatermark`. I think it can be shortened if we extract method for the sequence: processElement, expectedOutput.add, assert output equals. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java ########## @@ -443,38 +479,66 @@ public void testWatermark() throws Exception { public void testWatermarkAndStreamStatusForwarding() throws Exception { try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .modifyExecutionConfig(config -> config.enableObjectReuse()) .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2) - .addInput(BasicTypeInfo.INT_TYPE_INFO, 2) + .addSourceInput( + new SourceOperatorFactory<>( + new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, true), + WatermarkStrategy.forGenerator(ctx -> new RecordToWatermarkGenerator()))) .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2) .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory(3)) .build()) { ArrayDeque<Object> expectedOutput = new ArrayDeque<>(); - long initialTime = 0L; + int initialTime = 0; // test whether idle input channels are acknowledged correctly when forwarding watermarks testHarness.processElement(StreamStatus.IDLE, 0, 1); - testHarness.processElement(StreamStatus.IDLE, 1, 1); - testHarness.processElement(StreamStatus.IDLE, 2, 0); testHarness.processElement(new Watermark(initialTime + 6), 0, 0); - testHarness.processElement(new Watermark(initialTime + 6), 1, 0); - testHarness.processElement(new Watermark(initialTime + 5), 2, 1); // this watermark should be advanced first - testHarness.processElement(StreamStatus.IDLE, 2, 1); // once this is acknowledged, + testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first + testHarness.processElement(StreamStatus.IDLE, 1, 0); // once this is acknowledged, - expectedOutput.add(new Watermark(initialTime + 5)); // We don't expect to see Watermark(6) here because the idle status of one // input doesn't propagate to the other input. That is, if input 1 is at WM 6 and input // two was at WM 5 before going to IDLE then the output watermark will not jump to WM 6. + + // OPS, there is a known bug: https://issues.apache.org/jira/browse/FLINK-18934 + // that prevents this check from succeeding (AbstractStreamOperator and AbstractStreamOperatorV2 + // are ignoring StreamStatus), so those checks needs to be commented out ... + + //expectedOutput.add(new Watermark(initialTime + 5)); + //assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + + // and in as a temporary replacement we need this code block: + { + // we wake up the source and emit watermark + addSourceRecords(testHarness, 1, initialTime + 5); + while (testHarness.processSingleStep()) { + } Review comment: Extract method in test harness? (or use existing `processWhileAvailable`?) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org