philipnee commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1388606440
########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java: ########## @@ -16,22 +16,135 @@ */ package org.apache.kafka.common.telemetry.internals; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; + +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.TimeUnit; + /** - * This class represents a metric that does not yet contain resource tags. + * This class represents a telemetry metric that does not yet contain resource tags. * These additional resource tags will be added before emitting metrics by the telemetry reporter. */ public class SinglePointMetric implements MetricKeyable { private final MetricKey key; + private final Metric.Builder metricBuilder; - private SinglePointMetric(MetricKey key) { + private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) { this.key = key; + this.metricBuilder = metricBuilder; } @Override public MetricKey key() { return key; } - // TODO: Implement methods for serializing/deserializing metrics in required format. + public Metric.Builder builder() { + return metricBuilder; + } + + public static SinglePointMetric create(MetricKey metricKey, Metric.Builder metric) { + return new SinglePointMetric(metricKey, metric); + } + + /* + Methods to construct gauge metric type. + */ + public static SinglePointMetric gauge(MetricKey metricKey, Number value, Instant timestamp) { + NumberDataPoint.Builder point = point(timestamp, value); + return gauge(metricKey, point); + } + + public static SinglePointMetric gauge(MetricKey metricKey, double value, Instant timestamp) { + NumberDataPoint.Builder point = point(timestamp, value); + return gauge(metricKey, point); + } + + /* + Methods to construct sum metric type. + */ + + public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp) { + return sum(metricKey, value, monotonic, timestamp, null); + } + + public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp, + Instant startTimestamp) { + NumberDataPoint.Builder point = point(timestamp, value); + if (startTimestamp != null) { + point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp)); + } + + return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point); + } + + public static SinglePointMetric deltaSum(MetricKey metricKey, double value, boolean monotonic, + Instant timestamp, Instant startTimestamp) { + NumberDataPoint.Builder point = point(timestamp, value) + .setStartTimeUnixNano(toTimeUnixNanos(startTimestamp)); + + return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point); + } + + /* + Helper methods to support metric construction. + */ + private static SinglePointMetric sum(MetricKey metricKey, AggregationTemporality aggregationTemporality, + boolean monotonic, NumberDataPoint.Builder point) { + point.addAllAttributes(asAttributes(metricKey.tags())); + + Metric.Builder metric = Metric.newBuilder().setName(metricKey.name()); + metric + .getSumBuilder() + .setAggregationTemporality(aggregationTemporality) + .setIsMonotonic(monotonic) + .addDataPoints(point); + return create(metricKey, metric); + } + + private static SinglePointMetric gauge(MetricKey metricKey, NumberDataPoint.Builder point) { + point.addAllAttributes(asAttributes(metricKey.tags())); + + Metric.Builder metric = Metric.newBuilder().setName(metricKey.name()); + metric.getGaugeBuilder().addDataPoints(point); + return create(metricKey, metric); + } + + private static NumberDataPoint.Builder point(Instant timestamp, Number value) { + if (value instanceof Long || value instanceof Integer) { + return point(timestamp, value.longValue()); + } else { Review Comment: else is not needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org