Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5125#discussion_r154998584
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
    @@ -80,7 +91,10 @@ public void init() throws Exception {
                                this.headOperator);
     
                // make sure that stream tasks report their I/O statistics
    -           
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
    +           
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
    +
    +           
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
    --- End diff --
    
    The headoperator is never touched in the OperatorChain; we only setup 
chained operators.


---

Reply via email to