[ 
https://issues.apache.org/jira/browse/BEAM-6175?focusedWorklogId=206180&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-206180
 ]

ASF GitHub Bot logged work on BEAM-6175:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Mar/19 02:03
            Start Date: 01/Mar/19 02:03
    Worklog Time Spent: 10m 
      Work Description: DariuszAniszewski commented on pull request #7965: 
[BEAM-6175] Extract reader and publisher from MetricsMonitor in Python load 
tests
URL: https://github.com/apache/beam/pull/7965#discussion_r261457815
 
 

 ##########
 File path: 
sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
 ##########
 @@ -81,120 +83,158 @@ def get_element_by_schema(schema_name, insert_list):
       return element['value']
 
 
-class BigQueryClient(object):
-  def __init__(self, project_name, table, dataset):
-    self._namespace = table
-
-    self._bq_client = bigquery.Client(project=project_name)
-
-    self._schema_names = self._get_schema_names()
-    schema = self._prepare_schema()
-
-    self._get_or_create_table(schema, dataset)
-
-  def match_and_save(self, rows):
-    outputs = self._bq_client.insert_rows(self._bq_table, rows)
-    if len(outputs) > 0:
-      for output in outputs:
-        errors = output['errors']
-        for err in errors:
-          logging.error(err['message'])
-          raise ValueError(
-              'Unable save rows in BigQuery: {}'.format(err['message']))
+class MetricsReader(object):
+  publishers = []
 
-  def _get_dataset(self, dataset_name):
-    bq_dataset_ref = self._bq_client.dataset(dataset_name)
-    try:
-      bq_dataset = self._bq_client.get_dataset(bq_dataset_ref)
-    except NotFound:
-      raise ValueError(
-          'Dataset {} does not exist in your project. '
-          'You have to create table first.'
-          .format(dataset_name))
-    return bq_dataset
+  def __init__(self, project_name=None, bq_table=None, bq_dataset=None):
+    self.publishers.append(ConsoleMetricsPublisher())
+    check = project_name and bq_table and bq_dataset \
+            is not None
+    if check:
+      bq_publisher = BigQueryMetricsPublisher(
+          project_name, bq_table, bq_dataset)
+      self.publishers.append(bq_publisher)
 
-  def _get_or_create_table(self, bq_schemas, dataset):
-    if self._namespace == '':
-      raise ValueError('Namespace cannot be empty.')
+  def publish_metrics(self, result):
+    metrics = result.metrics().query()
+    insert_dicts = self._prepare_all_metrics(metrics)
+    for publisher in self.publishers:
+      publisher.publish(insert_dicts)
 
-    dataset = self._get_dataset(dataset)
-    table_ref = dataset.table(self._namespace)
+  def _prepare_all_metrics(self, metrics):
+    submit_timestamp = time.time()
+    metric_id = uuid.uuid4().hex
 
-    try:
-      self._bq_table = self._bq_client.get_table(table_ref)
-    except NotFound:
-      table = bigquery.Table(table_ref, schema=bq_schemas)
-      self._bq_table = self._bq_client.create_table(table)
+    insert_rows = []
 
-  def _prepare_schema(self):
-    return [SchemaField(**row) for row in SCHEMA]
+    for counter in metrics['counters']:
+      counter_dict = CounterMetric(counter, submit_timestamp, metric_id)\
+        .as_dict()
+      insert_rows.append(counter_dict)
 
-  def _get_schema_names(self):
-    return [schema['name'] for schema in SCHEMA]
+    dists = metrics['distributions']
+    if len(dists) > 0:
+      runtime = RuntimeMetric(dists, submit_timestamp, metric_id)\
+        .as_dict()
+      insert_rows.append(runtime)
 
+    return insert_rows
 
-class MetricsMonitor(object):
-  def __init__(self, project_name, table, dataset):
-    if project_name is not None:
-      self.bq = BigQueryClient(project_name, table, dataset)
 
-  def send_metrics(self, result):
-    metrics = result.metrics().query()
+class Metric(object):
+  value = None
+  label = None
 
-    insert_dicts = self._prepare_all_metrics(metrics)
+  def __init__(self, submit_timestamp, metric_id):
+    self.submit_timestamp = submit_timestamp
+    self.metric_id = metric_id
 
-    self.bq.match_and_save(insert_dicts)
+  def as_dict(self):
 
 Review comment:
   If I understand correctly, instance of `Metric` represents a single metric 
reading. Thus `Metric` isn't really a iterable and `as_dict` makes more sense 
to me.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 206180)
    Time Spent: 0.5h  (was: 20m)

> Extract reader and publisher from MetricsMonitor in Python Load Test utils
> --------------------------------------------------------------------------
>
>                 Key: BEAM-6175
>                 URL: https://issues.apache.org/jira/browse/BEAM-6175
>             Project: Beam
>          Issue Type: Improvement
>          Components: testing
>            Reporter: Kasia Kucharczyk
>            Assignee: Kasia Kucharczyk
>            Priority: Minor
>              Labels: triaged
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> For load tests metrics (load_test_utils.py) flexibility purposes, two classes 
> should be extracted from MetricsMonitor class:
> * MetricsReader - will only query the metrics by specific names/namespaces 
> and return a Map of keys and metric values for a namespace
> * MetricsPublisher - as a set of "Publishers". Each publisher then could 
> publish() to separate medium (BigQueryClient/console/others)
> This would make it even easier to extend this code in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to