scwhittle commented on code in PR #33503:
URL: https://github.com/apache/beam/pull/33503#discussion_r1981552498


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -146,14 +154,30 @@ private void recordRpcLatencyMetrics() {
       }
     }
 
-    private void recordBacklogBytes() {
+    private void recordBacklogBytesInternal() {
       for (Map.Entry<String, Long> backlogs : 
perTopicPartitionBacklogs().entrySet()) {
         Gauge gauge =
             KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", 
backlogs.getKey()));
         gauge.set(backlogs.getValue());
       }
     }
 
+    /**
+     * This is for recording backlog bytes on the current thread.
+     *
+     * @param topicName topicName
+     * @param partitionId partitionId for the topic Only included in the 
metric key if
+     *     'supportsMetricsDeletion' is enabled.
+     * @param backlogBytes backlog for the topic Only included in the metric 
key if
+     *     'supportsMetricsDeletion' is enabled.
+     */
+    @Override
+    public void recordBacklogBytes(String topicName, int partitionId, long 
backlogBytes) {

Review Comment:
   It's confusing how
   `KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", 
backlogs.getKey()));`
   and
   `Metrics.gauge(KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId));`
   are different from just looking here.
   
   From looking into the code, one sets perWorkerMetric when creating the 
DelegatingMetric. This is confusing because it is still a per-worker metric we 
are just reporting here, it is just in the MetricName.
   
   Could we change things to be consistent? For example, what if KafkaMetrics 
always set the per-worker label and then the dataflow metric container looks 
for per-worker metric label and handles them differently if it wants to instead 
of the separate perWorkerGauge method?
   
   It seems complicated that this kafkasinkmetrics has to know about the two 
possible reporting paths and which one works with different runners.  For 
example, what if the non-sdf kakfa impl is run on v2 via the SDF wrapper? Or 
what if we supported SDF impl on v1?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to