[ 
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064732#comment-16064732
 ] 

ASF GitHub Bot commented on FLINK-7009:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4188#discussion_r124255898
  
    --- Diff: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ---
    @@ -179,41 +254,130 @@ private String prefix(String ... names) {
                }
        }
     
    -   private void send(final String name, final String value) {
    +   private String buildStatsdLine(final String name, final String value, 
final String tags) {
    +           Double number;
                try {
    -                   String formatted = String.format("%s:%s|g", name, 
value);
    -                   byte[] data = 
formatted.getBytes(StandardCharsets.UTF_8);
    -                   socket.send(new DatagramPacket(data, data.length, 
this.address));
    +                   number = Double.parseDouble(value);
    +           }
    +           catch (NumberFormatException e) {
    +                   // quietly skip values like "n/a"
    +                   return "";
                }
    -           catch (IOException e) {
    -                   LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
    +           if (number >= 0.) {
    +                   return String.format("%s:%s|g%s", name, value, tags != 
null ? tags : "");
    +           } else {
    +                   // quietly skip "unknowns" like 
lowWaterMark:-9223372036854775808, or JVM.Memory.NonHeap.Max:-1, or NaN
    +                   return "";
                }
        }
     
    -   @Override
    -   public String filterCharacters(String input) {
    +   private void send(final String name, final String value, final String 
tags) {
    +           String formatted = buildStatsdLine(name, value, tags);
    +           if (formatted.length() > 0) {
    +                   try {
    +                           byte[] data = 
formatted.getBytes(StandardCharsets.UTF_8);
    +                           socket.send(new DatagramPacket(data, 
data.length, this.address));
    +                   }
    +                   catch (IOException e) {
    +                           LOG.error("unable to send packet to statsd at 
'{}:{}'", address.getHostName(), address.getPort());
    +                   }
    +           }
    +   }
    +
    +   /**
    +   * dogstatsd names should: start with letter, uses ascii alphanumerics 
and underscore, separated by periods.
    +   * Collapse runs of invalid characters into an underscore. Discard 
invalid prefix and suffix.
    +   * Eg: ":::metric:::name:::" ->  "metric_name"
    +   */
    +
    +   private boolean isValidStatsdChar(char c) {
    +           return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c 
>= '0' && c <= '9') || (c == '_');
    +   }
    +
    +   private String filterNCharacters(String input, int limit) {
                char[] chars = null;
                final int strLen = input.length();
                int pos = 0;
    +           boolean insertFiller = false;
     
    -           for (int i = 0; i < strLen; i++) {
    +           for (int i = 0; i < strLen && pos < limit; i++) {
                        final char c = input.charAt(i);
    -                   switch (c) {
    -                           case ':':
    -                                   if (chars == null) {
    -                                           chars = input.toCharArray();
    +                   if (isValidStatsdChar(c)) {
    +                           if (chars != null) {
    +                                   // skip invalid suffix, only fill if 
followed by valid character
    +                                   if (insertFiller) {
    +                                           chars[pos++] = '_';
    +                                           insertFiller = false;
                                        }
    -                                   chars[pos++] = '-';
    -                                   break;
    +                                   chars[pos] = c;
    +                           }
    +                           pos++;
    +                   } else {
    +                           if (chars == null) {
    +                                   chars = input.toCharArray();
    +                           }
    +                           // skip invalid prefix, until pos > 0
    +                           if (pos > 0) {
    +                                   // collapse sequence of invalid char 
into one filler
    +                                   insertFiller = true;
    +                           }
    +                   }
    +           }
     
    -                           default:
    -                                   if (chars != null) {
    -                                           chars[pos] = c;
    -                                   }
    -                                   pos++;
    +           if (chars == null) {
    +                   if (strLen > limit) {
    +                           return input.substring(0, limit);
    +                   } else {
    +                           return input; // happy path, input is entirely 
valid and under the limit
                        }
    +           } else {
    +                   return new String(chars, 0, pos);
                }
    +   }
     
    -           return chars == null ? input : new String(chars, 0, pos);
    +   /**
    +    * filterCharacters() is called on each delimited segment of the metric.
    +    *
    +    * <p>We might get a string that has coded structures, references to 
instances of serializers and reducers, and even if
    +    * we normalize all the odd characters could be overly long for a 
metric name, likely to be truncated downstream.
    +    * Our choices here appear to be either to discard invalid metrics, or 
to pragmatically handle each of the various
    +    * issues and produce something that might be useful in aggregate even 
though the named parts are hard to read.
    +    *
    +    * <p>This function will find and remove all object references like 
@abcd0123, so that task and operator names are stable.
    +    * The name of an operator should be the same every time it is run, so 
we should ignore object hash ids like these.
    +    *
    +    * <p>If the segment is a tm_id, task_id, job_id, task_attempt_id, we 
can optionally trim those to the first 8 chars.
    +    * This can reduce overall length substantially while still preserving 
enough to distinguish metrics from each other.
    +    *
    +    * <p>If the segment is 50 chars or longer, we will compress it to 
avoid truncation. The compression will look like the
    +    * first 10 valid chars followed by a hash of the original.  This 
sacrifices readability for utility as a metric, so
    +    * that latency metrics might survive with valid and useful dimensions 
for aggregation, even if it is very hard to
    +    * reverse engineer the particular operator name.  Application 
developers can of course still supply their own names
    +    * and are not forced to rely on the defaults.
    +    *
    +    * <p>This will turn something like:
    +    *              "TriggerWindow(TumblingProcessingTimeWindows(5000), 
ReducingStateDescriptor{serializer=org.apache.flink.api.java
    +    *              .typeutils.runtime.PojoSerializer@f3395ffa, 
reduceFunction=org.apache.flink.streaming.examples.socket.
    +    *              SocketWindowWordCount$1@4201c465}, 
ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))"
    +    *
    +    * <p>into:  "TriggerWin_c2910b88"
    +    */
    +   @Override
    +   public String filterCharacters(String input) {
    +           // remove instance references
    +           Matcher hasRefs = instanceRef.matcher(input);
    +           if (hasRefs.find()) {
    +                   input = hasRefs.replaceAll("");
    +           }
    +           // compress segments too long
    +           if (input.length() >= 50) {
    +                   return filterNCharacters(input, 10) + "_" + 
Integer.toHexString(input.hashCode());
    +           }
    +           int limit = Integer.MAX_VALUE;
    +           // optionally shrink flink ids
    +           if (shortIds && input.length() == 32 && 
flinkId.matcher(input).matches()) {
    --- End diff --
    
    I would like to point that this is an opt-in feature. Also, given that the 
IDs can be controlled by the user (`SingleOutputStreamOperator#setUid), you can 
guarantee uniqueness even with 8 characters.


> dogstatsd mode in statsd reporter
> ---------------------------------
>
>                 Key: FLINK-7009
>                 URL: https://issues.apache.org/jira/browse/FLINK-7009
>             Project: Flink
>          Issue Type: Improvement
>          Components: Metrics
>    Affects Versions: 1.4.0
>         Environment: org.apache.flink.metrics.statsd.StatsDReporter
>            Reporter: David Brinegar
>             Fix For: 1.4.0
>
>
> The current statsd reporter can only report a subset of Flink metrics owing 
> to the manner in which Flink variables are handled, mainly around invalid 
> characters and metrics too long.  As an option, it would be quite useful to 
> have a stricter dogstatsd compliant output.  Dogstatsd metrics are tagged, 
> should be less than 200 characters including tag names and values, be 
> alphanumeric + underbar, delimited by periods.  As a further pragmatic 
> restriction, negative and other invalid values should be ignored rather than 
> sent to the backend.  These restrictions play well with a broad set of 
> collectors and time series databases.
> This mode would:
> * convert output to ascii alphanumeric characters with underbar, delimited by 
> periods.  Runs of invalid characters within a metric segment would be 
> collapsed to a single underbar.
> * report all Flink variables as tags
> * compress overly long segments, say over 50 chars, to a symbolic 
> representation of the metric name, to preserve the unique metric time series 
> but avoid downstream truncation
> * compress 32 character Flink IDs like tm_id, task_id, job_id, 
> task_attempt_id, to the first 8 characters, again to preserve enough 
> distinction amongst metrics while trimming up to 96 characters from the metric
> * remove object references from names, such as the instance hash id of the 
> serializer
> * drop negative or invalid numeric values such as "n/a", "-1" which is used 
> for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is 
> used for unknowns like currentLowWaterMark
> With these in place, it becomes quite reasonable to support LatencyGauge 
> metrics as well.
> One idea for symbolic compression is to take the first 10 valid characters 
> plus a hash of the long name.  For example, a value like this operator_name:
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> would first drop the instance references.  The stable version would be:
>  
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> and then the compressed name would be the first ten valid characters plus the 
> hash of the stable string:
> {code}
> TriggerWin_d8c007da
> {code}
> This is just one way of dealing with unruly default names, the main point 
> would be to preserve the metrics so they are valid, avoid truncation, and can 
> be aggregated along other dimensions even if this particular dimension is 
> hard to parse after the compression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to