This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 31a0922  [BEAM-14016] Fixed flaky postcommit test (#17009)
31a0922 is described below

commit 31a09221d7801e00e87337edee5d0503d6f86b81
Author: Ning Kang <ningkang0...@gmail.com>
AuthorDate: Fri Mar 4 12:32:39 2022 -0800

    [BEAM-14016] Fixed flaky postcommit test (#17009)
    
    Fixed SpannerWriteIntegrationTest.test_spanner_update by fixing the
    metric exporter usage in spannerio.
---
 .../apache_beam/io/gcp/experimental/spannerio.py   | 27 ++++++++++++++--------
 1 file changed, 18 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py 
b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py
index e36fbad..39a3a27 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py
@@ -156,7 +156,7 @@ Unlike the Java connector, this connector does not create 
batches of
 transactions sorted by table and primary key.
 
 WriteToSpanner transforms starts with the grouping into batches. The first step
-in this process is to make the make the mutation groups of the WriteMutation
+in this process is to make the mutation groups of the WriteMutation
 objects and then filtering them into batchable and unbatchable mutation
 groups. There are three batching parameters (max_number_cells, max_number_rows
 & max_batch_size_bytes). We calculated th mutation byte size from the method
@@ -1202,9 +1202,12 @@ class _WriteToSpannerDoFn(DoFn):
         monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project,
         monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database,
     }
-    self.service_metric = None
+    # table_id to metrics
+    self.service_metrics = {}
 
-  def _table_metric(self, table_id):
+  def _register_table_metric(self, table_id):
+    if table_id in self.service_metrics:
+      return
     database_id = self._spanner_configuration.database
     project_id = self._spanner_configuration.project
     resource = resource_identifiers.SpannerTable(
@@ -1217,7 +1220,7 @@ class _WriteToSpannerDoFn(DoFn):
     service_call_metric = ServiceCallMetric(
         request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
         base_labels=labels)
-    return service_call_metric
+    self.service_metrics[table_id] = service_call_metric
 
   def setup(self):
     spanner_client = Client(self._spanner_configuration.project)
@@ -1226,13 +1229,16 @@ class _WriteToSpannerDoFn(DoFn):
         self._spanner_configuration.database,
         pool=self._spanner_configuration.pool)
 
+  def start_bundle(self):
+    self.service_metrics = {}
+
   def process(self, element):
     self.batches.inc()
     try:
       with self._db_instance.batch() as b:
         for m in element:
           table_id = m.kwargs['table']
-          self.service_metric = self._table_metric(table_id)
+          self._register_table_metric(table_id)
 
           if m.operation == WriteMutation._OPERATION_DELETE:
             batch_func = b.delete
@@ -1247,14 +1253,17 @@ class _WriteToSpannerDoFn(DoFn):
           else:
             raise ValueError("Unknown operation action: %s" % m.operation)
           batch_func(**m.kwargs)
-
-      self.service_metric.call('ok')
     except (ClientError, GoogleAPICallError) as e:
-      self.service_metric.call(str(e.code.value))
+      for service_metric in self.service_metrics.values():
+        service_metric.call(str(e.code.value))
       raise
     except HttpError as e:
-      self.service_metric.call(str(e))
+      for service_metric in self.service_metrics.values():
+        service_metric.call(str(e))
       raise
+    else:
+      for service_metric in self.service_metrics.values():
+        service_metric.call('ok')
 
 
 @with_input_types(typing.Union[MutationGroup, _Mutator])

Reply via email to