Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5161#discussion_r165000781 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -194,14 +190,20 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); this.output = output; } - Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); - int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE); - if (historySize <= 0) { - LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize); - historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(); + + try { + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE); + if (historySize <= 0) { + LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize); + historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(); + } + this.latencyStats = new LatencyStats(this.metrics.parent().parent().addGroup("latency"), historySize, container.getIndexInSubtaskGroup(), getOperatorID()); --- End diff -- I guess what we could do add another method to the `TaskManagerJobMetricGroup` for creating the latency metric group? With that, modifications to the metric group structure will result in a compile error if `parent().parent()` does not return us to the job metric group.
---