This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 31a0922 [BEAM-14016] Fixed flaky postcommit test (#17009) 31a0922 is described below commit 31a09221d7801e00e87337edee5d0503d6f86b81 Author: Ning Kang <ningkang0...@gmail.com> AuthorDate: Fri Mar 4 12:32:39 2022 -0800 [BEAM-14016] Fixed flaky postcommit test (#17009) Fixed SpannerWriteIntegrationTest.test_spanner_update by fixing the metric exporter usage in spannerio. --- .../apache_beam/io/gcp/experimental/spannerio.py | 27 ++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index e36fbad..39a3a27 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -156,7 +156,7 @@ Unlike the Java connector, this connector does not create batches of transactions sorted by table and primary key. WriteToSpanner transforms starts with the grouping into batches. The first step -in this process is to make the make the mutation groups of the WriteMutation +in this process is to make the mutation groups of the WriteMutation objects and then filtering them into batchable and unbatchable mutation groups. There are three batching parameters (max_number_cells, max_number_rows & max_batch_size_bytes). We calculated th mutation byte size from the method @@ -1202,9 +1202,12 @@ class _WriteToSpannerDoFn(DoFn): monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project, monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database, } - self.service_metric = None + # table_id to metrics + self.service_metrics = {} - def _table_metric(self, table_id): + def _register_table_metric(self, table_id): + if table_id in self.service_metrics: + return database_id = self._spanner_configuration.database project_id = self._spanner_configuration.project resource = resource_identifiers.SpannerTable( @@ -1217,7 +1220,7 @@ class _WriteToSpannerDoFn(DoFn): service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) - return service_call_metric + self.service_metrics[table_id] = service_call_metric def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -1226,13 +1229,16 @@ class _WriteToSpannerDoFn(DoFn): self._spanner_configuration.database, pool=self._spanner_configuration.pool) + def start_bundle(self): + self.service_metrics = {} + def process(self, element): self.batches.inc() try: with self._db_instance.batch() as b: for m in element: table_id = m.kwargs['table'] - self.service_metric = self._table_metric(table_id) + self._register_table_metric(table_id) if m.operation == WriteMutation._OPERATION_DELETE: batch_func = b.delete @@ -1247,14 +1253,17 @@ class _WriteToSpannerDoFn(DoFn): else: raise ValueError("Unknown operation action: %s" % m.operation) batch_func(**m.kwargs) - - self.service_metric.call('ok') except (ClientError, GoogleAPICallError) as e: - self.service_metric.call(str(e.code.value)) + for service_metric in self.service_metrics.values(): + service_metric.call(str(e.code.value)) raise except HttpError as e: - self.service_metric.call(str(e)) + for service_metric in self.service_metrics.values(): + service_metric.call(str(e)) raise + else: + for service_metric in self.service_metrics.values(): + service_metric.call('ok') @with_input_types(typing.Union[MutationGroup, _Mutator])