lindong28 commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r600297493



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -88,11 +90,15 @@ public void init() throws Exception {
             input = new StreamTaskSourceInput<>(sourceOperator, 0, 0);
         }
 
+        CountingOutput<T> countingOutput =

Review comment:
       Thank you Stephan for the explanation. Yes I agree we should try to 
minimize the number of wrappers on the performance critical path. I have 
updated the PR to move `counter` logic into `AsyncDataOutputToOutput`.
   
   I noticed that 
[StreamTaskSourceOutput](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java#L225)
 currently does not take any metric whereas 
[StreamTaskNetworkOutput](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java#L205),
 whose usage is in the same loop body as `StreamTaskSourceOutput`, takes 
`mainOperatorRecordsIn` as input.
   
   I tried to find the answer in the code. Still it is not clear to me whether 
`StreamTaskSourceOutput` should take metrics (e.g. numRecordsIn, numRecordsOut) 
as input and update them. I will keep the `StreamTaskSourceOutput` as is.
   
   +@becketqin for comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to