Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5125#discussion_r155327970
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
    @@ -80,7 +91,10 @@ public void init() throws Exception {
                                this.headOperator);
     
                // make sure that stream tasks report their I/O statistics
    -           
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
    +           
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
    +
    +           
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
    --- End diff --
    
    yup. sources don't have input watermark metrics, sinks don't have output 
watermarks.
    
    Assigners would work just like any other operator. Assigners overriding the 
input watermark will lead to odd observations: having a source (that emits 
watermarks) followed by a watermark assigner may result in different 
output/input watermarks although they _should_ be identical (source emits 2L, 
but assigner "receives" 4L).
    
    To _me_ this seems more intuitive, and it's definitely less complex :/


---

Reply via email to