This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f0a53ba040 initialize and increment metrics properly (#24592)
1f0a53ba040 is described below

commit 1f0a53ba0408a9d45aec7bf61724a1d42984a6ea
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Fri Dec 9 13:25:51 2022 -0500

    initialize and increment metrics properly (#24592)
---
 .../gcp/bigquery/BigQueryStorageStreamSource.java  | 39 +++++++++++++---------
 1 file changed, 24 insertions(+), 15 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index 6dda7706c4d..812524615ce 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -167,6 +168,24 @@ class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
     private @Nullable TableReference tableReference;
     private @Nullable ServiceCallMetric serviceCallMetric;
 
+    // Initialize metrics.
+    private final Counter totalSplitCalls =
+        Metrics.counter(BigQueryStorageStreamReader.class, 
"split-at-fraction-calls");
+    private final Counter impossibleSplitPointCalls =
+        Metrics.counter(
+            BigQueryStorageStreamReader.class,
+            "split-at-fraction-calls-failed-due-to-impossible-split-point");
+    private final Counter badSplitPointCalls =
+        Metrics.counter(
+            BigQueryStorageStreamReader.class,
+            "split-at-fraction-calls-failed-due-to-bad-split-point");
+    private final Counter otherFailedSplitCalls =
+        Metrics.counter(
+            BigQueryStorageStreamReader.class,
+            "split-at-fraction-calls-failed-due-to-other-reasons");
+    private final Counter successfulSplitCalls =
+        Metrics.counter(BigQueryStorageStreamReader.class, 
"split-at-fraction-calls-successful");
+
     private BigQueryStorageStreamReader(
         BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws 
IOException {
       this.source = source;
@@ -305,7 +324,7 @@ class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
       // @RequiresNonNull
       Preconditions.checkStateNotNull(responseStream);
       BigQueryServerStream<ReadRowsResponse> responseStream = 
this.responseStream;
-      Metrics.counter(BigQueryStorageStreamReader.class, 
"split-at-fraction-calls").inc();
+      totalSplitCalls.inc();
       LOG.debug(
           "Received BigQuery Storage API split request for stream {} at 
fraction {}.",
           source.readStream.getName(),
@@ -329,10 +348,7 @@ class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
       SplitReadStreamResponse splitResponse = 
storageClient.splitReadStream(splitRequest);
       if (!splitResponse.hasPrimaryStream() || 
!splitResponse.hasRemainderStream()) {
         // No more splits are possible!
-        Metrics.counter(
-                BigQueryStorageStreamReader.class,
-                "split-at-fraction-calls-failed-due-to-impossible-split-point")
-            .inc();
+        impossibleSplitPointCalls.inc();
         LOG.info(
             "BigQuery Storage API stream {} cannot be split at {}.",
             source.readStream.getName(),
@@ -363,10 +379,7 @@ class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
         } catch (FailedPreconditionException e) {
           // The current source has already moved past the split point, so 
this split attempt
           // is unsuccessful.
-          Metrics.counter(
-                  BigQueryStorageStreamReader.class,
-                  "split-at-fraction-calls-failed-due-to-bad-split-point")
-              .inc();
+          badSplitPointCalls.inc();
           LOG.info(
               "BigQuery Storage API split of stream {} abandoned because the 
primary stream is to "
                   + "the left of the split fraction {}.",
@@ -374,10 +387,7 @@ class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
               fraction);
           return null;
         } catch (Exception e) {
-          Metrics.counter(
-                  BigQueryStorageStreamReader.class,
-                  "split-at-fraction-calls-failed-due-to-other-reasons")
-              .inc();
+          otherFailedSplitCalls.inc();
           LOG.error("BigQuery Storage API stream split failed.", e);
           return null;
         }
@@ -390,8 +400,7 @@ class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
         reader.resetBuffer();
       }
 
-      Metrics.counter(BigQueryStorageStreamReader.class, 
"split-at-fraction-calls-successful")
-          .inc();
+      successfulSplitCalls.inc();
       LOG.info(
           "Successfully split BigQuery Storage API stream at {}. Split 
response: {}",
           fraction,

Reply via email to