Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5161#discussion_r165000781
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
@@ -194,14 +190,20 @@ public void setup(StreamTask<?, ?> containingTask,
StreamConfig config, Output<S
this.metrics =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
this.output = output;
}
- Configuration taskManagerConfig =
container.getEnvironment().getTaskManagerInfo().getConfiguration();
- int historySize =
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
- if (historySize <= 0) {
- LOG.warn("{} has been set to a value equal or below 0:
{}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
- historySize =
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
+
+ try {
+ Configuration taskManagerConfig =
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+ int historySize =
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
+ if (historySize <= 0) {
+ LOG.warn("{} has been set to a value equal or
below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
+ historySize =
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
+ }
+ this.latencyStats = new
LatencyStats(this.metrics.parent().parent().addGroup("latency"), historySize,
container.getIndexInSubtaskGroup(), getOperatorID());
--- End diff --
I guess what we could do add another method to the
`TaskManagerJobMetricGroup` for creating the latency metric group? With that,
modifications to the metric group structure will result in a compile error if
`parent().parent()` does not return us to the job metric group.
---