[ 
https://issues.apache.org/jira/browse/BEAM-4553?focusedWorklogId=152705&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152705
 ]

ASF GitHub Bot logged work on BEAM-4553:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/18 15:11
            Start Date: 09/Oct/18 15:11
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on a change in pull request #6569: 
[BEAM-4553] Implement graphite sink for MetricsPusher and refactor 
MetricsHttpSink test
URL: https://github.com/apache/beam/pull/6569#discussion_r223744438
 
 

 ##########
 File path: 
runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
 ##########
 @@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.extensions.metrics;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.charset.Charset;
+import java.util.Locale;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsSink;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Sink to push metrics to Graphite. Graphite requires a timestamp. So metrics 
are reported with the
+ * timestamp (seconds from epoch) when the push to the sink was done (except 
with gauges that
+ * already have a timestamp value). The graphite metric name will be in the 
form of
+ * 
beam.metricType.metricNamespace.metricName.[committed|attempted].metricValueType
 For example:
+ * {@code beam.counter.throughput.nbRecords.attempted.value} Or {@code
+ * beam.distribution.throughput.nbRecordsPerSec.attempted.mean}
+ */
+public class MetricsGraphiteSink implements MetricsSink {
+  private static final Charset UTF_8 = Charset.forName("UTF-8");
+  private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
+  private static final String SPACE_REPLACEMENT = "_";
+  private final String address;
+  private final int port;
+  private final Charset charset;
+
+  public MetricsGraphiteSink(PipelineOptions pipelineOptions) {
+    this.address = pipelineOptions.getMetricsGraphiteHost();
+    this.port = pipelineOptions.getMetricsGraphitePort();
+    this.charset = UTF_8;
+  }
+
+  @Experimental(Experimental.Kind.METRICS)
+  @Override
+  public void writeMetrics(MetricQueryResults metricQueryResults) throws 
Exception {
+    final long metricTimestamp = System.currentTimeMillis() / 1000L;
+    Socket socket = new Socket(InetAddress.getByName(address), port);
+    BufferedWriter writer =
+        new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), 
charset));
+    StringBuilder messagePayload = new StringBuilder();
+    Iterable<MetricResult<Long>> counters = metricQueryResults.getCounters();
+    Iterable<MetricResult<GaugeResult>> gauges = 
metricQueryResults.getGauges();
+    Iterable<MetricResult<DistributionResult>> distributions =
+        metricQueryResults.getDistributions();
+
+    for (MetricResult<Long> counter : counters) {
+      // if committed metrics are not supported, exception is thrown and we 
don't append the message
+      try {
+        messagePayload.append(createCounterGraphiteMessage(metricTimestamp, 
counter, true));
+      } catch (UnsupportedOperationException e) {
+        if (!e.getMessage().contains("committed metrics")) {
+          throw e;
+        }
+      }
+      messagePayload.append(createCounterGraphiteMessage(metricTimestamp, 
counter, false));
+    }
+
+    for (MetricResult<GaugeResult> gauge : gauges) {
+      try {
+        messagePayload.append(createGaugeGraphiteMessage(gauge, true));
+      } catch (UnsupportedOperationException e) {
+        if (!e.getMessage().contains("committed metrics")) {
+          throw e;
+        }
+      }
+      messagePayload.append(createGaugeGraphiteMessage(gauge, false));
+    }
+
+    for (MetricResult<DistributionResult> distribution : distributions) {
+      try {
+        messagePayload.append(
+            createDistributionGraphiteMessage(metricTimestamp, distribution, 
true));
+      } catch (UnsupportedOperationException e) {
+        if (!e.getMessage().contains("committed metrics")) {
+          throw e;
+        }
+      }
+      messagePayload.append(
+          createDistributionGraphiteMessage(metricTimestamp, distribution, 
false));
+    }
+    writer.write(messagePayload.toString());
+    writer.flush();
+    writer.close();
+    socket.close();
+  }
+
+  @SuppressFBWarnings(
+    value = "VA_FORMAT_STRING_USES_NEWLINE",
+    justification = "\\n is part of graphite protocol"
+  )
+  private String createCounterGraphiteMessage(
+      long metricTimestamp, MetricResult<Long> counter, boolean 
committedValue) {
+    String metricMessage;
+    if (committedValue) {
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  counter, "counter", "value", CommittedOrAttemped.COMMITTED),
+              counter.getCommitted(),
+              metricTimestamp);
+    } else {
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  counter, "counter", "value", CommittedOrAttemped.ATTEMPTED),
+              counter.getAttempted(),
+              metricTimestamp);
+    }
+    return metricMessage;
+  }
+
+  @SuppressFBWarnings(
+    value = "VA_FORMAT_STRING_USES_NEWLINE",
+    justification = "\\n is part of graphite protocol"
+  )
+  private String createGaugeGraphiteMessage(
+      MetricResult<GaugeResult> gauge, boolean committedValue) {
+    String metricMessage;
+    if (committedValue) {
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(gauge, "gauge", "value", 
CommittedOrAttemped.COMMITTED),
+              gauge.getCommitted().getValue(),
+              gauge.getCommitted().getTimestamp().getMillis() / 1000L);
+    } else {
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(gauge, "gauge", "value", 
CommittedOrAttemped.ATTEMPTED),
+              gauge.getAttempted().getValue(),
+              gauge.getAttempted().getTimestamp().getMillis() / 1000L);
+    }
+    return metricMessage;
+  }
+
+  @SuppressFBWarnings(
+    value = "VA_FORMAT_STRING_USES_NEWLINE",
+    justification = "\\n is part of graphite protocol"
+  )
+  private String createDistributionGraphiteMessage(
+      long metricTimestamp, MetricResult<DistributionResult> distribution, 
boolean committedValue) {
+    StringBuilder messagePayload = new StringBuilder();
+    String metricMessage;
+    if (committedValue) {
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "min", 
CommittedOrAttemped.COMMITTED),
+              distribution.getCommitted().getMin(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "max", 
CommittedOrAttemped.COMMITTED),
+              distribution.getCommitted().getMax(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "count", 
CommittedOrAttemped.COMMITTED),
+              distribution.getCommitted().getCount(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "sum", 
CommittedOrAttemped.COMMITTED),
+              distribution.getCommitted().getSum(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "mean", 
CommittedOrAttemped.COMMITTED),
+              distribution.getCommitted().getMean(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+    } else {
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "min", 
CommittedOrAttemped.ATTEMPTED),
+              distribution.getAttempted().getMin(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "max", 
CommittedOrAttemped.ATTEMPTED),
+              distribution.getAttempted().getMax(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "count", 
CommittedOrAttemped.ATTEMPTED),
+              distribution.getAttempted().getCount(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "sum", 
CommittedOrAttemped.ATTEMPTED),
+              distribution.getAttempted().getSum(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+      metricMessage =
+          String.format(
+              Locale.US,
+              "%s %s %s\n",
+              createNormalizedMetricName(
+                  distribution, "distribution", "mean", 
CommittedOrAttemped.ATTEMPTED),
+              distribution.getAttempted().getMean(),
+              metricTimestamp);
+      messagePayload.append(metricMessage);
+    }
+    return messagePayload.toString();
+  }
+
+  private <T> String createNormalizedMetricName(
+      MetricResult<T> metric,
+      String metricType,
+      String valueType,
+      CommittedOrAttemped committedOrAttemped) {
+    String metricName =
+        String.format(
+            "beam.%s.%s.%s.%s.%s",
+            metricType,
+            metric.getName().getNamespace(),
+            metric.getName().getName(),
+            committedOrAttemped,
+            valueType);
+
+    return WHITESPACE.matcher(metricName).replaceAll(SPACE_REPLACEMENT);
+  }
+
+  private enum CommittedOrAttemped {
 
 Review comment:
   no, only two type of metrics

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152705)
    Time Spent: 2h 20m  (was: 2h 10m)

> Implement a Graphite sink for the metrics pusher
> ------------------------------------------------
>
>                 Key: BEAM-4553
>                 URL: https://issues.apache.org/jira/browse/BEAM-4553
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-extensions-metrics
>            Reporter: Etienne Chauchot
>            Assignee: Etienne Chauchot
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Today only a REST Http sink that sends raw json metrics using POST request to 
> a http server is available. It is more a POC sink. It would be good to code 
> the first real metrics sink. Some of the most popular is Graphite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to