This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 1f0a53ba040 initialize and increment metrics properly (#24592) 1f0a53ba040 is described below commit 1f0a53ba0408a9d45aec7bf61724a1d42984a6ea Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Fri Dec 9 13:25:51 2022 -0500 initialize and increment metrics properly (#24592) --- .../gcp/bigquery/BigQueryStorageStreamSource.java | 39 +++++++++++++--------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java index 6dda7706c4d..812524615ce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; +import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -167,6 +168,24 @@ class BigQueryStorageStreamSource<T> extends BoundedSource<T> { private @Nullable TableReference tableReference; private @Nullable ServiceCallMetric serviceCallMetric; + // Initialize metrics. + private final Counter totalSplitCalls = + Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls"); + private final Counter impossibleSplitPointCalls = + Metrics.counter( + BigQueryStorageStreamReader.class, + "split-at-fraction-calls-failed-due-to-impossible-split-point"); + private final Counter badSplitPointCalls = + Metrics.counter( + BigQueryStorageStreamReader.class, + "split-at-fraction-calls-failed-due-to-bad-split-point"); + private final Counter otherFailedSplitCalls = + Metrics.counter( + BigQueryStorageStreamReader.class, + "split-at-fraction-calls-failed-due-to-other-reasons"); + private final Counter successfulSplitCalls = + Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls-successful"); + private BigQueryStorageStreamReader( BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException { this.source = source; @@ -305,7 +324,7 @@ class BigQueryStorageStreamSource<T> extends BoundedSource<T> { // @RequiresNonNull Preconditions.checkStateNotNull(responseStream); BigQueryServerStream<ReadRowsResponse> responseStream = this.responseStream; - Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls").inc(); + totalSplitCalls.inc(); LOG.debug( "Received BigQuery Storage API split request for stream {} at fraction {}.", source.readStream.getName(), @@ -329,10 +348,7 @@ class BigQueryStorageStreamSource<T> extends BoundedSource<T> { SplitReadStreamResponse splitResponse = storageClient.splitReadStream(splitRequest); if (!splitResponse.hasPrimaryStream() || !splitResponse.hasRemainderStream()) { // No more splits are possible! - Metrics.counter( - BigQueryStorageStreamReader.class, - "split-at-fraction-calls-failed-due-to-impossible-split-point") - .inc(); + impossibleSplitPointCalls.inc(); LOG.info( "BigQuery Storage API stream {} cannot be split at {}.", source.readStream.getName(), @@ -363,10 +379,7 @@ class BigQueryStorageStreamSource<T> extends BoundedSource<T> { } catch (FailedPreconditionException e) { // The current source has already moved past the split point, so this split attempt // is unsuccessful. - Metrics.counter( - BigQueryStorageStreamReader.class, - "split-at-fraction-calls-failed-due-to-bad-split-point") - .inc(); + badSplitPointCalls.inc(); LOG.info( "BigQuery Storage API split of stream {} abandoned because the primary stream is to " + "the left of the split fraction {}.", @@ -374,10 +387,7 @@ class BigQueryStorageStreamSource<T> extends BoundedSource<T> { fraction); return null; } catch (Exception e) { - Metrics.counter( - BigQueryStorageStreamReader.class, - "split-at-fraction-calls-failed-due-to-other-reasons") - .inc(); + otherFailedSplitCalls.inc(); LOG.error("BigQuery Storage API stream split failed.", e); return null; } @@ -390,8 +400,7 @@ class BigQueryStorageStreamSource<T> extends BoundedSource<T> { reader.resetBuffer(); } - Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls-successful") - .inc(); + successfulSplitCalls.inc(); LOG.info( "Successfully split BigQuery Storage API stream at {}. Split response: {}", fraction,