philipnee commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388606440


##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##########
@@ -16,22 +16,135 @@
  */
 package org.apache.kafka.common.telemetry.internals;
 
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /**
- * This class represents a metric that does not yet contain resource tags.
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
  * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
  */
 public class SinglePointMetric implements MetricKeyable {
 
     private final MetricKey key;
+    private final Metric.Builder metricBuilder;
 
-    private SinglePointMetric(MetricKey key) {
+    private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
         this.key = key;
+        this.metricBuilder = metricBuilder;
     }
 
     @Override
     public MetricKey key() {
         return key;
     }
 
-    // TODO: Implement methods for serializing/deserializing metrics in 
required format.
+    public Metric.Builder builder() {
+        return metricBuilder;
+    }
+
+    public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+        return new SinglePointMetric(metricKey, metric);
+    }
+
+    /*
+        Methods to construct gauge metric type.
+     */
+    public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        return gauge(metricKey, point);
+    }
+
+    public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        return gauge(metricKey, point);
+    }
+
+    /*
+        Methods to construct sum metric type.
+     */
+
+    public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+        return sum(metricKey, value, monotonic, timestamp, null);
+    }
+
+    public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+        Instant startTimestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        if (startTimestamp != null) {
+            point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+        }
+
+        return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+    }
+
+    public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+        Instant timestamp, Instant startTimestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value)
+            .setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+        return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+    }
+
+    /*
+        Helper methods to support metric construction.
+     */
+    private static SinglePointMetric sum(MetricKey metricKey, 
AggregationTemporality aggregationTemporality,
+        boolean monotonic, NumberDataPoint.Builder point) {
+        point.addAllAttributes(asAttributes(metricKey.tags()));
+
+        Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+        metric
+            .getSumBuilder()
+            .setAggregationTemporality(aggregationTemporality)
+            .setIsMonotonic(monotonic)
+            .addDataPoints(point);
+        return create(metricKey, metric);
+    }
+
+    private static SinglePointMetric gauge(MetricKey metricKey, 
NumberDataPoint.Builder point) {
+        point.addAllAttributes(asAttributes(metricKey.tags()));
+
+        Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+        metric.getGaugeBuilder().addDataPoints(point);
+        return create(metricKey, metric);
+    }
+
+    private static NumberDataPoint.Builder point(Instant timestamp, Number 
value) {
+        if (value instanceof Long || value instanceof Integer) {
+            return point(timestamp, value.longValue());
+        } else {

Review Comment:
   else is not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to