Repository: beam Updated Branches: refs/heads/master 6d627534b -> e58155263
Querying of both structured and unstructured metrics in dataflow. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b98bde91 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b98bde91 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b98bde91 Branch: refs/heads/master Commit: b98bde912c2b2dbcb0ce1d30f6af32b7219d831d Parents: 6d62753 Author: Pablo <pabl...@google.com> Authored: Wed Mar 22 13:22:32 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Mon Mar 27 17:32:32 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/dataflow_metrics.py | 86 +++++++++++++++----- .../runners/dataflow/dataflow_metrics_test.py | 63 ++++++++++++-- .../runners/dataflow/dataflow_runner.py | 2 +- 3 files changed, 123 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py index db5a2bc..f90e3d5 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -30,25 +30,67 @@ from apache_beam.metrics.metric import MetricResults from apache_beam.metrics.metricbase import MetricName -# TODO(pabloem)(JIRA-1381) Implement this once metrics are queriable from -# dataflow service class DataflowMetrics(MetricResults): """Implementation of MetricResults class for the Dataflow runner.""" - def __init__(self, dataflow_client=None, job_result=None): + def __init__(self, dataflow_client=None, job_result=None, job_graph=None): """Initialize the Dataflow metrics object. Args: dataflow_client: apiclient.DataflowApplicationClient to interact with the dataflow service. job_result: DataflowPipelineResult with the state and id information of - the job + the job. + job_graph: apiclient.Job instance to be able to translate between internal + step names (e.g. "s2"), and user step names (e.g. "split"). """ super(DataflowMetrics, self).__init__() self._dataflow_client = dataflow_client self.job_result = job_result self._queried_after_termination = False self._cached_metrics = None + self._job_graph = job_graph + + def _translate_step_name(self, internal_name): + """Translate between internal step names (e.g. "s1") and user step names.""" + if not self._job_graph: + raise ValueError('Could not translate the internal step name.') + + try: + [step] = [step + for step in self._job_graph.proto.steps + if step.name == internal_name] + [user_step_name] = [prop.value.string_value + for prop in step.properties.additionalProperties + if prop.key == 'user_name'] + except ValueError: + raise ValueError('Could not translate the internal step name.') + return user_step_name + + def _get_metric_key(self, metric): + """Populate the MetricKey object for a queried metric result.""" + try: + # If ValueError is thrown within this try-block, it is because of + # one of the following: + # 1. Unable to translate the step name. Only happening with improperly + # formatted job graph (unlikely), or step name not being the internal + # step name (only happens for unstructured-named metrics). + # 2. Unable to unpack [step] or [namespace]; which should only happen + # for unstructured names. + [step] = [prop.value + for prop in metric.name.context.additionalProperties + if prop.key == 'step'] + step = self._translate_step_name(step) + [namespace] = [prop.value + for prop in metric.name.context.additionalProperties + if prop.key == 'namespace'] + name = metric.name.name + except ValueError: + # An unstructured metric name is "step/namespace/name", but step names + # can (and often do) contain slashes. Must only split on the right-most + # two slashes, to preserve the full step name. + [step, namespace, name] = metric.name.name.rsplit('/', 2) + return MetricKey(step, MetricName(namespace, name)) def _populate_metric_results(self, response): """Take a list of metrics, and convert it to a list of MetricResult.""" @@ -59,29 +101,31 @@ class DataflowMetrics(MetricResults): # Get the tentative/committed versions of every metric together. metrics_by_name = defaultdict(lambda: {}) for metric in user_metrics: - tentative = [prop - for prop in metric.name.context.additionalProperties - if prop.key == 'tentative' and prop.value == 'true'] - key = 'tentative' if tentative else 'committed' - metrics_by_name[metric.name.name][key] = metric - - # Now we create the MetricResult elements. - result = [] - for name, metric in metrics_by_name.iteritems(): - if (name.endswith('(DIST)') or - name.endswith('[MIN]') or - name.endswith('[MAX]') or - name.endswith('[MEAN]') or - name.endswith('[COUNT]')): + if (metric.name.name.endswith('(DIST)') or + metric.name.name.endswith('[MIN]') or + metric.name.name.endswith('[MAX]') or + metric.name.name.endswith('[MEAN]') or + metric.name.name.endswith('[COUNT]')): warn('Distribution metrics will be ignored in the MetricsResult.query' 'method. You can see them in the Dataflow User Interface.') # Distributions are not yet fully supported in this runner continue - [step, namespace, name] = name.split('/') - key = MetricKey(step, MetricName(namespace, name)) + is_tentative = [prop + for prop in metric.name.context.additionalProperties + if prop.key == 'tentative' and prop.value == 'true'] + tentative_or_committed = 'tentative' if is_tentative else 'committed' + + metric_key = self._get_metric_key(metric) + metrics_by_name[metric_key][tentative_or_committed] = metric + + # Now we create the MetricResult elements. + result = [] + for metric_key, metric in metrics_by_name.iteritems(): attempted = metric['tentative'].scalar.integer_value committed = metric['committed'].scalar.integer_value - result.append(MetricResult(key, attempted=attempted, committed=committed)) + result.append(MetricResult(metric_key, + attempted=attempted, + committed=committed)) return result http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py index 8d18fae..95027a3 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py @@ -18,6 +18,7 @@ Tests corresponding to the DataflowRunner implementation of MetricsResult, the DataflowMetrics class. """ +import types import unittest import mock @@ -43,6 +44,37 @@ class DictToObject(object): class TestDataflowMetrics(unittest.TestCase): + STRUCTURED_COUNTER_LIST = {"metrics": [ + {"name": {"context": + {"additionalProperties": [ + {"key": "namespace", + "value": "__main__.WordExtractingDoFn"}, + {"key": "step", + "value": "s2"}, + {"key": "tentative", + "value": "true"}] + }, + "name": "word_lengths", + "origin": "user" + }, + "scalar": {"integer_value": 109475}, + "updateTime": "2017-03-22T18:47:06.402Z" + }, + {"name": {"context": + {"additionalProperties": [ + {"key": "namespace", + "value": "__main__.WordExtractingDoFn"}, + {"key": "step", + "value": "s2"}] + }, + "name": "word_lengths", + "origin": "user" + }, + "scalar": {"integer_value": 109475}, + "updateTime": "2017-03-22T18:47:06.402Z" + }, + ]} + BASIC_COUNTER_LIST = {"metrics": [ {"name": {"context": {"additionalProperties":[ @@ -71,18 +103,19 @@ class TestDataflowMetrics(unittest.TestCase): "value": "user-split-split/__main__.WordExtractingDoFn/" "words_TentativeAggregateValue"}, {"key": "step", "value": "split"}]}, - "name": "split/__main__.WordExtractingDoFn/words", + "name": "longstepname/split/__main__.WordExtractingDoFn/words", "origin": "user"}, "scalar": {"integer_value": 26181}, "updateTime": "2017-02-23T01:13:36.659Z"}, {"name": {"context": {"additionalProperties": [ {"key": "original_name", - "value": "user-split-split/__main__.WordExtractingDoFn/" + "value": "user-split-longstepname/split/" + "__main__.WordExtractingDoFn/" "words_TentativeAggregateValue"}, {"key": "step", "value": "split"}, {"key": "tentative", "value": "true"}]}, - "name": "split/__main__.WordExtractingDoFn/words", + "name": "longstepname/split/__main__.WordExtractingDoFn/words", "origin": "user"}, "scalar": {"integer_value": 26185}, "updateTime": "2017-02-23T01:13:36.659Z"}, @@ -100,9 +133,12 @@ class TestDataflowMetrics(unittest.TestCase): "updateTime": "2017-02-23T01:13:36.659Z"} ]} - def setup_mock_client_result(self): + def setup_mock_client_result(self, counter_list=None): + if counter_list is None: + counter_list = self.BASIC_COUNTER_LIST + mock_client = mock.Mock() - mock_query_result = DictToObject(self.BASIC_COUNTER_LIST) + mock_query_result = DictToObject(counter_list) mock_client.get_job_metrics.return_value = mock_query_result mock_job_result = mock.Mock() mock_job_result.job_id.return_value = 1 @@ -125,6 +161,21 @@ class TestDataflowMetrics(unittest.TestCase): dm.query() self.assertTrue(dm._cached_metrics) + def test_query_structured_counters(self): + mock_client, mock_job_result = self.setup_mock_client_result( + self.STRUCTURED_COUNTER_LIST) + dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result) + dm._translate_step_name = types.MethodType(lambda self, x: 'split', dm) + query_result = dm.query() + expected_counters = [ + MetricResult( + MetricKey('split', + MetricName('__main__.WordExtractingDoFn', + 'word_lengths')), + 109475, 109475), + ] + self.assertEqual(query_result['counters'], expected_counters) + def test_query_counters(self): mock_client, mock_job_result = self.setup_mock_client_result() dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result) @@ -135,7 +186,7 @@ class TestDataflowMetrics(unittest.TestCase): MetricName('__main__.WordExtractingDoFn', 'empty_lines')), 1080, 1080), MetricResult( - MetricKey('split', + MetricKey('longstepname/split', MetricName('__main__.WordExtractingDoFn', 'words')), 26181, 26185), ] http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 25f2fd4..bd29d63 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -174,7 +174,7 @@ class DataflowRunner(PipelineRunner): result = DataflowPipelineResult( self.dataflow_client.create_job(self.job), self) - self._metrics = DataflowMetrics(self.dataflow_client, result) + self._metrics = DataflowMetrics(self.dataflow_client, result, self.job) result.metric_results = self._metrics return result