sjvanrossum commented on code in PR #33408:
URL: https://github.com/apache/beam/pull/33408#discussion_r1906156715


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java:
##########
@@ -345,4 +372,76 @@ public void testConvert_convertCountersAndHistograms() {
         parsedMetricNames,
         IsMapContaining.hasEntry(histogramMetricName, 
parsedHistogramMetricName));
   }
+
+  @Test
+  public void testConvert_successfulyConvertGauges() {
+    String step = "testStepName";
+    Map<MetricName, LockFreeHistogram.Snapshot> emptyHistograms = new 
HashMap<>();
+    Map<MetricName, Long> counters = new HashMap<MetricName, Long>();
+    Map<MetricName, Long> gauges = new HashMap<MetricName, Long>();
+    Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames 
= new HashMap<>();
+
+    MetricName KafkaMetric1 = MetricName.named("KafkaSink", "metric1");
+    MetricName KafkaMetric2 = MetricName.named("KafkaSink", 
"metric2*label1:val1;label2:val2;");
+    MetricName KafkaMetric3 = MetricName.named("KafkaSink", "metric3");
+
+    gauges.put(KafkaMetric1, 5L);
+    gauges.put(KafkaMetric2, 10L);
+    gauges.put(KafkaMetric3, 0L);
+
+    Collection<PerStepNamespaceMetrics> conversionResult =
+        MetricsToPerStepNamespaceMetricsConverter.convert(
+            step, counters, gauges, emptyHistograms, parsedMetricNames);
+
+    DataflowGaugeValue gauge_value1 = new DataflowGaugeValue();
+    gauge_value1.setValue(5L);
+
+    DataflowGaugeValue gauge_value2 = new DataflowGaugeValue();
+    gauge_value2.setValue(10L);
+
+    DataflowGaugeValue gauge_value3 = new DataflowGaugeValue();
+    gauge_value3.setValue(0L); // zero valued

Review Comment:
   These variable names should be in lower camel case.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
 
     abstract HashMap<String, ConcurrentLinkedQueue<Duration>> 
perTopicRpcLatencies();
 
+    static ConcurrentHashMap<String, Gauge> backlogGauges = new 
ConcurrentHashMap<String, Gauge>();
+
+    abstract HashMap<String, Long> perTopicPartitionBacklogs();

Review Comment:
   If an instance of this class may be concurrently updated, then `HashMap` 
needs to be replaced (ditto for the existing `HashMap` fields). Use 
`ConcurrentHashMap` instead.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+/**
+ * A no-op implementation of Gauge. This class exists to provide a default if 
an implementation of
+ * MetricsContainer does not override a Gauge getter.
+ */
+public class NoOpGauge implements Gauge {
+
+  private static final NoOpGauge singleton = new NoOpGauge();
+  private static final MetricName name = MetricName.named(NoOpGauge.class, 
"singleton");

Review Comment:
   Static field names should be in upper snake case.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -40,6 +42,7 @@ public class KafkaSinkMetrics {
 
   // Base Metric names
   private static final String RPC_LATENCY = "RpcLatency";
+  private static final String ESTIAMTED_BACKLOG_SIZE = "EstimatedBacklogSize";

Review Comment:
   Nit: `ESTIAMTED_BACKLOG_SIZE` -> `ESTIMATED_BACKLOG_SIZE`



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -93,6 +107,21 @@ public void updateSuccessfulRpcMetrics(String topic, 
Duration elapsedTime) {
       }
     }
 
+    /**
+     * @param topicName topicName
+     * @param partitionId partitionId for the topic Only included in the 
metric key if
+     *     'supportsMetricsDeletion' is enabled.
+     * @param backlog backlog for the topic Only included in the metric key if
+     *     'supportsMetricsDeletion' is enabled.
+     */

Review Comment:
   " Only" -> ". Only"?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -71,6 +75,48 @@ public static Histogram createRPCLatencyHistogram(RpcMethod 
method, String topic
     return new DelegatingHistogram(metricName, buckets, false, true);
   }
 
+  /**
+   * Creates an Gauge metric to record per partition backlog. Metric will have 
name:

Review Comment:
   Nit: `an Gauge` -> `a {@link Gauge}`, ". Metric will have name:" -> " with 
the name:"



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -71,6 +75,48 @@ public static Histogram createRPCLatencyHistogram(RpcMethod 
method, String topic
     return new DelegatingHistogram(metricName, buckets, false, true);
   }
 
+  /**
+   * Creates an Gauge metric to record per partition backlog. Metric will have 
name:
+   *
+   * <p>'EstimatedBacklogSize*topic_name:{topic};partitionId:{partitionId};'
+   *
+   * @param topic Kafka topic associated with this metric.
+   * @param partitionId partition id associated with this metric.
+   * @return Counter.
+   */
+  public static Gauge createBacklogGauge(String topic, int partitionId) {
+    return new DelegatingGauge(getMetricGaugeName(topic, partitionId), false, 
true);
+  }
+
+  /**
+   * Creates an Gauge metric to record per partition backlog. Metric will have 
name:

Review Comment:
   Nit: an Gauge -> a {@link Gauge}, ". Metric will have name:" -> " with the 
name:"



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -743,6 +747,16 @@ private void reportBacklog() {
     backlogElementsOfSplit.set(splitBacklogMessages);
   }
 
+  private void reportBacklogMetrics() {

Review Comment:
   Looks like this can be merged with `reportBacklog` (potentially rename that 
method to `reportBacklogMetrics` `updateBacklogMetrics`).



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -302,13 +303,11 @@ public Instant getCurrentTimestamp() throws 
NoSuchElementException {
   @Override
   public long getSplitBacklogBytes() {
     long backlogBytes = 0;
-
     for (PartitionState<K, V> p : partitionStates) {
       long pBacklog = p.approxBacklogInBytes();
       if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
         return UnboundedReader.BACKLOG_UNKNOWN;
       }
-      backlogBytes += pBacklog;
     }
 
     return backlogBytes;

Review Comment:
   This method should remain as it is, right? `getSplitBacklogBytes` and may 
affect autoscaling behavior according to the docs and this change would have 
this override return either 0 or `UnboundedReader.BACKLOG_UNKNOWN` instead of 
the split's approximate backlog bytes across assigned partitions or 
`UnboundedReader.BACKLOG_UNKNOWN`.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
 
     abstract HashMap<String, ConcurrentLinkedQueue<Duration>> 
perTopicRpcLatencies();
 
+    static ConcurrentHashMap<String, Gauge> backlogGauges = new 
ConcurrentHashMap<String, Gauge>();
+
+    abstract HashMap<String, Long> perTopicPartitionBacklogs();

Review Comment:
   Slightly unrelated, but why doesn't `perTopicRpcLatencies` use a gauge or 
sum as the value type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to