sjvanrossum commented on code in PR #33408:
URL: https://github.com/apache/beam/pull/33408#discussion_r1906156715
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java:
##########
@@ -345,4 +372,76 @@ public void testConvert_convertCountersAndHistograms() {
parsedMetricNames,
IsMapContaining.hasEntry(histogramMetricName,
parsedHistogramMetricName));
}
+
+ @Test
+ public void testConvert_successfulyConvertGauges() {
+ String step = "testStepName";
+ Map<MetricName, LockFreeHistogram.Snapshot> emptyHistograms = new
HashMap<>();
+ Map<MetricName, Long> counters = new HashMap<MetricName, Long>();
+ Map<MetricName, Long> gauges = new HashMap<MetricName, Long>();
+ Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames
= new HashMap<>();
+
+ MetricName KafkaMetric1 = MetricName.named("KafkaSink", "metric1");
+ MetricName KafkaMetric2 = MetricName.named("KafkaSink",
"metric2*label1:val1;label2:val2;");
+ MetricName KafkaMetric3 = MetricName.named("KafkaSink", "metric3");
+
+ gauges.put(KafkaMetric1, 5L);
+ gauges.put(KafkaMetric2, 10L);
+ gauges.put(KafkaMetric3, 0L);
+
+ Collection<PerStepNamespaceMetrics> conversionResult =
+ MetricsToPerStepNamespaceMetricsConverter.convert(
+ step, counters, gauges, emptyHistograms, parsedMetricNames);
+
+ DataflowGaugeValue gauge_value1 = new DataflowGaugeValue();
+ gauge_value1.setValue(5L);
+
+ DataflowGaugeValue gauge_value2 = new DataflowGaugeValue();
+ gauge_value2.setValue(10L);
+
+ DataflowGaugeValue gauge_value3 = new DataflowGaugeValue();
+ gauge_value3.setValue(0L); // zero valued
Review Comment:
These variable names should be in lower camel case.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
abstract HashMap<String, ConcurrentLinkedQueue<Duration>>
perTopicRpcLatencies();
+ static ConcurrentHashMap<String, Gauge> backlogGauges = new
ConcurrentHashMap<String, Gauge>();
+
+ abstract HashMap<String, Long> perTopicPartitionBacklogs();
Review Comment:
If an instance of this class may be concurrently updated, then `HashMap`
needs to be replaced (ditto for the existing `HashMap` fields). Use
`ConcurrentHashMap` instead.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+/**
+ * A no-op implementation of Gauge. This class exists to provide a default if
an implementation of
+ * MetricsContainer does not override a Gauge getter.
+ */
+public class NoOpGauge implements Gauge {
+
+ private static final NoOpGauge singleton = new NoOpGauge();
+ private static final MetricName name = MetricName.named(NoOpGauge.class,
"singleton");
Review Comment:
Static field names should be in upper snake case.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -40,6 +42,7 @@ public class KafkaSinkMetrics {
// Base Metric names
private static final String RPC_LATENCY = "RpcLatency";
+ private static final String ESTIAMTED_BACKLOG_SIZE = "EstimatedBacklogSize";
Review Comment:
Nit: `ESTIAMTED_BACKLOG_SIZE` -> `ESTIMATED_BACKLOG_SIZE`
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -93,6 +107,21 @@ public void updateSuccessfulRpcMetrics(String topic,
Duration elapsedTime) {
}
}
+ /**
+ * @param topicName topicName
+ * @param partitionId partitionId for the topic Only included in the
metric key if
+ * 'supportsMetricsDeletion' is enabled.
+ * @param backlog backlog for the topic Only included in the metric key if
+ * 'supportsMetricsDeletion' is enabled.
+ */
Review Comment:
" Only" -> ". Only"?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -71,6 +75,48 @@ public static Histogram createRPCLatencyHistogram(RpcMethod
method, String topic
return new DelegatingHistogram(metricName, buckets, false, true);
}
+ /**
+ * Creates an Gauge metric to record per partition backlog. Metric will have
name:
Review Comment:
Nit: `an Gauge` -> `a {@link Gauge}`, ". Metric will have name:" -> " with
the name:"
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -71,6 +75,48 @@ public static Histogram createRPCLatencyHistogram(RpcMethod
method, String topic
return new DelegatingHistogram(metricName, buckets, false, true);
}
+ /**
+ * Creates an Gauge metric to record per partition backlog. Metric will have
name:
+ *
+ * <p>'EstimatedBacklogSize*topic_name:{topic};partitionId:{partitionId};'
+ *
+ * @param topic Kafka topic associated with this metric.
+ * @param partitionId partition id associated with this metric.
+ * @return Counter.
+ */
+ public static Gauge createBacklogGauge(String topic, int partitionId) {
+ return new DelegatingGauge(getMetricGaugeName(topic, partitionId), false,
true);
+ }
+
+ /**
+ * Creates an Gauge metric to record per partition backlog. Metric will have
name:
Review Comment:
Nit: an Gauge -> a {@link Gauge}, ". Metric will have name:" -> " with the
name:"
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -743,6 +747,16 @@ private void reportBacklog() {
backlogElementsOfSplit.set(splitBacklogMessages);
}
+ private void reportBacklogMetrics() {
Review Comment:
Looks like this can be merged with `reportBacklog` (potentially rename that
method to `reportBacklogMetrics` `updateBacklogMetrics`).
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -302,13 +303,11 @@ public Instant getCurrentTimestamp() throws
NoSuchElementException {
@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
-
for (PartitionState<K, V> p : partitionStates) {
long pBacklog = p.approxBacklogInBytes();
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
- backlogBytes += pBacklog;
}
return backlogBytes;
Review Comment:
This method should remain as it is, right? `getSplitBacklogBytes` and may
affect autoscaling behavior according to the docs and this change would have
this override return either 0 or `UnboundedReader.BACKLOG_UNKNOWN` instead of
the split's approximate backlog bytes across assigned partitions or
`UnboundedReader.BACKLOG_UNKNOWN`.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
abstract HashMap<String, ConcurrentLinkedQueue<Duration>>
perTopicRpcLatencies();
+ static ConcurrentHashMap<String, Gauge> backlogGauges = new
ConcurrentHashMap<String, Gauge>();
+
+ abstract HashMap<String, Long> perTopicPartitionBacklogs();
Review Comment:
Slightly unrelated, but why doesn't `perTopicRpcLatencies` use a gauge or
sum as the value type?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]