[ https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064732#comment-16064732 ]
ASF GitHub Bot commented on FLINK-7009: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124255898 --- Diff: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java --- @@ -179,41 +254,130 @@ private String prefix(String ... names) { } } - private void send(final String name, final String value) { + private String buildStatsdLine(final String name, final String value, final String tags) { + Double number; try { - String formatted = String.format("%s:%s|g", name, value); - byte[] data = formatted.getBytes(StandardCharsets.UTF_8); - socket.send(new DatagramPacket(data, data.length, this.address)); + number = Double.parseDouble(value); + } + catch (NumberFormatException e) { + // quietly skip values like "n/a" + return ""; } - catch (IOException e) { - LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); + if (number >= 0.) { + return String.format("%s:%s|g%s", name, value, tags != null ? tags : ""); + } else { + // quietly skip "unknowns" like lowWaterMark:-9223372036854775808, or JVM.Memory.NonHeap.Max:-1, or NaN + return ""; } } - @Override - public String filterCharacters(String input) { + private void send(final String name, final String value, final String tags) { + String formatted = buildStatsdLine(name, value, tags); + if (formatted.length() > 0) { + try { + byte[] data = formatted.getBytes(StandardCharsets.UTF_8); + socket.send(new DatagramPacket(data, data.length, this.address)); + } + catch (IOException e) { + LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); + } + } + } + + /** + * dogstatsd names should: start with letter, uses ascii alphanumerics and underscore, separated by periods. + * Collapse runs of invalid characters into an underscore. Discard invalid prefix and suffix. + * Eg: ":::metric:::name:::" -> "metric_name" + */ + + private boolean isValidStatsdChar(char c) { + return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c == '_'); + } + + private String filterNCharacters(String input, int limit) { char[] chars = null; final int strLen = input.length(); int pos = 0; + boolean insertFiller = false; - for (int i = 0; i < strLen; i++) { + for (int i = 0; i < strLen && pos < limit; i++) { final char c = input.charAt(i); - switch (c) { - case ':': - if (chars == null) { - chars = input.toCharArray(); + if (isValidStatsdChar(c)) { + if (chars != null) { + // skip invalid suffix, only fill if followed by valid character + if (insertFiller) { + chars[pos++] = '_'; + insertFiller = false; } - chars[pos++] = '-'; - break; + chars[pos] = c; + } + pos++; + } else { + if (chars == null) { + chars = input.toCharArray(); + } + // skip invalid prefix, until pos > 0 + if (pos > 0) { + // collapse sequence of invalid char into one filler + insertFiller = true; + } + } + } - default: - if (chars != null) { - chars[pos] = c; - } - pos++; + if (chars == null) { + if (strLen > limit) { + return input.substring(0, limit); + } else { + return input; // happy path, input is entirely valid and under the limit } + } else { + return new String(chars, 0, pos); } + } - return chars == null ? input : new String(chars, 0, pos); + /** + * filterCharacters() is called on each delimited segment of the metric. + * + * <p>We might get a string that has coded structures, references to instances of serializers and reducers, and even if + * we normalize all the odd characters could be overly long for a metric name, likely to be truncated downstream. + * Our choices here appear to be either to discard invalid metrics, or to pragmatically handle each of the various + * issues and produce something that might be useful in aggregate even though the named parts are hard to read. + * + * <p>This function will find and remove all object references like @abcd0123, so that task and operator names are stable. + * The name of an operator should be the same every time it is run, so we should ignore object hash ids like these. + * + * <p>If the segment is a tm_id, task_id, job_id, task_attempt_id, we can optionally trim those to the first 8 chars. + * This can reduce overall length substantially while still preserving enough to distinguish metrics from each other. + * + * <p>If the segment is 50 chars or longer, we will compress it to avoid truncation. The compression will look like the + * first 10 valid chars followed by a hash of the original. This sacrifices readability for utility as a metric, so + * that latency metrics might survive with valid and useful dimensions for aggregation, even if it is very hard to + * reverse engineer the particular operator name. Application developers can of course still supply their own names + * and are not forced to rely on the defaults. + * + * <p>This will turn something like: + * "TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java + * .typeutils.runtime.PojoSerializer@f3395ffa, reduceFunction=org.apache.flink.streaming.examples.socket. + * SocketWindowWordCount$1@4201c465}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))" + * + * <p>into: "TriggerWin_c2910b88" + */ + @Override + public String filterCharacters(String input) { + // remove instance references + Matcher hasRefs = instanceRef.matcher(input); + if (hasRefs.find()) { + input = hasRefs.replaceAll(""); + } + // compress segments too long + if (input.length() >= 50) { + return filterNCharacters(input, 10) + "_" + Integer.toHexString(input.hashCode()); + } + int limit = Integer.MAX_VALUE; + // optionally shrink flink ids + if (shortIds && input.length() == 32 && flinkId.matcher(input).matches()) { --- End diff -- I would like to point that this is an opt-in feature. Also, given that the IDs can be controlled by the user (`SingleOutputStreamOperator#setUid), you can guarantee uniqueness even with 8 characters. > dogstatsd mode in statsd reporter > --------------------------------- > > Key: FLINK-7009 > URL: https://issues.apache.org/jira/browse/FLINK-7009 > Project: Flink > Issue Type: Improvement > Components: Metrics > Affects Versions: 1.4.0 > Environment: org.apache.flink.metrics.statsd.StatsDReporter > Reporter: David Brinegar > Fix For: 1.4.0 > > > The current statsd reporter can only report a subset of Flink metrics owing > to the manner in which Flink variables are handled, mainly around invalid > characters and metrics too long. As an option, it would be quite useful to > have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged, > should be less than 200 characters including tag names and values, be > alphanumeric + underbar, delimited by periods. As a further pragmatic > restriction, negative and other invalid values should be ignored rather than > sent to the backend. These restrictions play well with a broad set of > collectors and time series databases. > This mode would: > * convert output to ascii alphanumeric characters with underbar, delimited by > periods. Runs of invalid characters within a metric segment would be > collapsed to a single underbar. > * report all Flink variables as tags > * compress overly long segments, say over 50 chars, to a symbolic > representation of the metric name, to preserve the unique metric time series > but avoid downstream truncation > * compress 32 character Flink IDs like tm_id, task_id, job_id, > task_attempt_id, to the first 8 characters, again to preserve enough > distinction amongst metrics while trimming up to 96 characters from the metric > * remove object references from names, such as the instance hash id of the > serializer > * drop negative or invalid numeric values such as "n/a", "-1" which is used > for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is > used for unknowns like currentLowWaterMark > With these in place, it becomes quite reasonable to support LatencyGauge > metrics as well. > One idea for symbolic compression is to take the first 10 valid characters > plus a hash of the long name. For example, a value like this operator_name: > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > would first drop the instance references. The stable version would be: > > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > and then the compressed name would be the first ten valid characters plus the > hash of the stable string: > {code} > TriggerWin_d8c007da > {code} > This is just one way of dealing with unruly default names, the main point > would be to preserve the metrics so they are valid, avoid truncation, and can > be aggregated along other dimensions even if this particular dimension is > hard to parse after the compression. -- This message was sent by Atlassian JIRA (v6.4.14#64029)