Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5125#discussion_r155210751
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
    @@ -80,7 +91,10 @@ public void init() throws Exception {
                                this.headOperator);
     
                // make sure that stream tasks report their I/O statistics
    -           
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
    +           
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
    +
    +           
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
    --- End diff --
    
    however, the separate input1/input2 metrics ignore the 
`WatermarkMetricPreference`; as in they are never overridden by the output 
metric. (because that is difficult to do in the OperatorChain since there we 
have no concept of TwoInputOperators....)


---

Reply via email to