[ https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064524#comment-16064524 ]
ASF GitHub Bot commented on FLINK-7009: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124219351 --- Diff: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java --- @@ -90,6 +109,45 @@ public void close() { // ------------------------------------------------------------------------ + /** + * Removes leading and trailing angle brackets. + */ + private String stripBrackets(String str) { + return str.substring(1, str.length() - 1); + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + if (dogstatsdMode) { + // memoize dogstatsd tag section: "|#tag:val,tag:val,tag:val" + StringBuilder statsdTagLine = new StringBuilder(); + Map<String, String> orderedTags = new TreeMap<>(group.getAllVariables()); + for (Map.Entry<String, String> entry: orderedTags.entrySet()) { + String k = stripBrackets(entry.getKey()); + String v = filterCharacters(entry.getValue()); + statsdTagLine.append(",").append(k).append(":").append(v); + } + if (statsdTagLine.length() > 0) { + // remove first comma, prefix with "|#" + tagTable.put(metric, "|#" + statsdTagLine.substring(1)); + + String name = metric.getClass().getSimpleName(); --- End diff -- this variable isn't used. > dogstatsd mode in statsd reporter > --------------------------------- > > Key: FLINK-7009 > URL: https://issues.apache.org/jira/browse/FLINK-7009 > Project: Flink > Issue Type: Improvement > Components: Metrics > Affects Versions: 1.4.0 > Environment: org.apache.flink.metrics.statsd.StatsDReporter > Reporter: David Brinegar > Fix For: 1.4.0 > > > The current statsd reporter can only report a subset of Flink metrics owing > to the manner in which Flink variables are handled, mainly around invalid > characters and metrics too long. As an option, it would be quite useful to > have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged, > should be less than 200 characters including tag names and values, be > alphanumeric + underbar, delimited by periods. As a further pragmatic > restriction, negative and other invalid values should be ignored rather than > sent to the backend. These restrictions play well with a broad set of > collectors and time series databases. > This mode would: > * convert output to ascii alphanumeric characters with underbar, delimited by > periods. Runs of invalid characters within a metric segment would be > collapsed to a single underbar. > * report all Flink variables as tags > * compress overly long segments, say over 50 chars, to a symbolic > representation of the metric name, to preserve the unique metric time series > but avoid downstream truncation > * compress 32 character Flink IDs like tm_id, task_id, job_id, > task_attempt_id, to the first 8 characters, again to preserve enough > distinction amongst metrics while trimming up to 96 characters from the metric > * remove object references from names, such as the instance hash id of the > serializer > * drop negative or invalid numeric values such as "n/a", "-1" which is used > for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is > used for unknowns like currentLowWaterMark > With these in place, it becomes quite reasonable to support LatencyGauge > metrics as well. > One idea for symbolic compression is to take the first 10 valid characters > plus a hash of the long name. For example, a value like this operator_name: > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > would first drop the instance references. The stable version would be: > > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > and then the compressed name would be the first ten valid characters plus the > hash of the stable string: > {code} > TriggerWin_d8c007da > {code} > This is just one way of dealing with unruly default names, the main point > would be to preserve the metrics so they are valid, avoid truncation, and can > be aggregated along other dimensions even if this particular dimension is > hard to parse after the compression. -- This message was sent by Atlassian JIRA (v6.4.14#64029)