Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5161#discussion_r165000781
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
    @@ -194,14 +190,20 @@ public void setup(StreamTask<?, ?> containingTask, 
StreamConfig config, Output<S
                        this.metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
                        this.output = output;
                }
    -           Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
    -           int historySize = 
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
    -           if (historySize <= 0) {
    -                   LOG.warn("{} has been set to a value equal or below 0: 
{}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
    -                   historySize = 
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
    +
    +           try {
    +                   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
    +                   int historySize = 
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
    +                   if (historySize <= 0) {
    +                           LOG.warn("{} has been set to a value equal or 
below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
    +                           historySize = 
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
    +                   }
    +                   this.latencyStats = new 
LatencyStats(this.metrics.parent().parent().addGroup("latency"), historySize, 
container.getIndexInSubtaskGroup(), getOperatorID());
    --- End diff --
    
    I guess what we could do add another method to the 
`TaskManagerJobMetricGroup` for creating the latency metric group? With that, 
modifications to the metric group structure will result in a compile error if 
`parent().parent()` does not return us to the job metric group.


---

Reply via email to