Repository: beam Updated Branches: refs/heads/python-sdk 8bf4c8059 -> d1906416e
[BEAM-147] Adding Metrics API to Python SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/88bad042 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/88bad042 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/88bad042 Branch: refs/heads/python-sdk Commit: 88bad042a0de8be89786c3e1c909f18b3be088f5 Parents: 8bf4c80 Author: Pablo <pabl...@google.com> Authored: Tue Nov 22 14:31:26 2016 -0800 Committer: bchambers <bchamb...@google.com> Committed: Thu Dec 29 16:02:31 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/metrics/__init__.py | 17 + sdks/python/apache_beam/metrics/cells.py | 315 +++++++++++++++++++ sdks/python/apache_beam/metrics/cells_test.py | 143 +++++++++ sdks/python/apache_beam/metrics/execution.py | 210 +++++++++++++ .../apache_beam/metrics/execution_test.py | 131 ++++++++ sdks/python/apache_beam/metrics/metric.py | 165 ++++++++++ sdks/python/apache_beam/metrics/metric_test.py | 85 +++++ sdks/python/apache_beam/metrics/metricbase.py | 81 +++++ .../runners/direct/direct_metrics.py | 112 +++++++ .../runners/direct/direct_metrics_test.py | 211 +++++++++++++ .../apache_beam/runners/direct/direct_runner.py | 5 + .../runners/direct/evaluation_context.py | 9 + .../apache_beam/runners/direct/executor.py | 9 + .../runners/direct/transform_result.py | 4 + sdks/python/apache_beam/runners/runner_test.py | 43 +++ 15 files changed, 1540 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/__init__.py b/sdks/python/apache_beam/metrics/__init__.py new file mode 100644 index 0000000..164d1a8 --- /dev/null +++ b/sdks/python/apache_beam/metrics/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from apache_beam.metrics.metric import Metrics http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/cells.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py new file mode 100644 index 0000000..5a571f5 --- /dev/null +++ b/sdks/python/apache_beam/metrics/cells.py @@ -0,0 +1,315 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This file contains metric cell classes. A metric cell is used to accumulate +in-memory changes to a metric. It represents a specific metric in a single +context. + +Cells depend on a 'dirty-bit' in the CellCommitState class that tracks whether +a cell's updates have been committed. +""" + +import threading + +from apache_beam.metrics.metricbase import Counter +from apache_beam.metrics.metricbase import Distribution + + +class CellCommitState(object): + """Atomically tracks a cell's dirty/clean commit status. + + Reporting a metric update works in a two-step process: First, updates to the + metric are received, and the metric is marked as 'dirty'. Later, updates are + committed, and then the cell may be marked as 'clean'. + + The tracking of a cell's state is done conservatively: A metric may be + reported DIRTY even if updates have not occurred. + + This class is thread-safe. + """ + + # Indicates that there have been changes to the cell since the last commit. + DIRTY = 0 + # Indicates that there have NOT been changes to the cell since last commit. + CLEAN = 1 + # Indicates that a commit of the current value is in progress. + COMMITTING = 2 + + def __init__(self): + """Initializes ``CellCommitState``. + + A cell is initialized as dirty. + """ + self._lock = threading.Lock() + self._state = CellCommitState.DIRTY + + @property + def state(self): + with self._lock: + return self._state + + def after_modification(self): + """Indicate that changes have been made to the metric being tracked. + + Should be called after modification of the metric value. + """ + with self._lock: + self._state = CellCommitState.DIRTY + + def after_commit(self): + """Mark changes made up to the last call to ``before_commit`` as committed. + + The next call to ``before_commit`` will return ``False`` unless there have + been changes made. + """ + with self._lock: + if self._state == CellCommitState.COMMITTING: + self._state = CellCommitState.CLEAN + + def before_commit(self): + """Check the dirty state, and mark the metric as committing. + + After this call, the state is either CLEAN, or COMMITTING. If the state + was already CLEAN, then we simply return. If it was either DIRTY or + COMMITTING, then we set the cell as COMMITTING (e.g. in the middle of + a commit). + + After a commit is successful, ``after_commit`` should be called. + + Returns: + A boolean, which is false if the cell is CLEAN, and true otherwise. + """ + with self._lock: + if self._state == CellCommitState.CLEAN: + return False + else: + self._state = CellCommitState.COMMITTING + return True + + +class MetricCell(object): + """Accumulates in-memory changes to a metric. + + A MetricCell represents a specific metric in a single context and bundle. + All subclasses must be thread safe, as these are used in the pipeline runners, + and may be subject to parallel/concurrent updates. Cells should only be used + directly within a runner. + """ + def __init__(self): + self.commit = CellCommitState() + self._lock = threading.Lock() + + def get_cumulative(self): + raise NotImplementedError + + +class CounterCell(Counter, MetricCell): + """Tracks the current value and delta of a counter metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, cells are + aggregated by the runner. + + This class is thread safe. + """ + def __init__(self, *args): + super(CounterCell, self).__init__(*args) + self.value = 0 + + def combine(self, other): + result = CounterCell() + result.inc(self.value + other.value) + return result + + def inc(self, n=1): + with self._lock: + self.value += n + self.commit.after_modification() + + def get_cumulative(self): + with self._lock: + return self.value + + +class DistributionCell(Distribution, MetricCell): + """Tracks the current value and delta for a distribution metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, that is later + aggregated. + + This class is thread safe. + """ + def __init__(self, *args): + super(DistributionCell, self).__init__(*args) + self.data = DistributionData(0, 0, None, None) + + def combine(self, other): + result = DistributionCell() + result.data = self.data.combine(other.data) + return result + + def update(self, value): + with self._lock: + self.commit.after_modification() + self._update(value) + + def _update(self, value): + value = int(value) + self.data.count += 1 + self.data.sum += value + self.data.min = (value + if self.data.min is None or self.data.min > value + else self.data.min) + self.data.max = (value + if self.data.max is None or self.data.max < value + else self.data.max) + + def get_cumulative(self): + with self._lock: + return self.data.get_cumulative() + + +class DistributionResult(object): + """The result of a Distribution metric. + """ + def __init__(self, data): + self.data = data + + def __eq__(self, other): + return self.data == other.data + + @property + def max(self): + return self.data.max + + @property + def min(self): + return self.data.min + + @property + def count(self): + return self.data.count + + @property + def sum(self): + return self.data.sum + + @property + def mean(self): + """Returns the float mean of the distribution. + + If the distribution contains no elements, it returns None. + """ + if self.data.count == 0: + return None + else: + return float(self.data.sum)/self.data.count + + +class DistributionData(object): + """The data structure that holds data about a distribution metric. + + Distribution metrics are restricted to distributions of integers only. + + This object is not thread safe, so it's not supposed to be modified + by other than the DistributionCell that contains it. + """ + def __init__(self, sum, count, min, max): + self.sum = sum + self.count = count + self.min = min + self.max = max + + def __eq__(self, other): + return (self.sum == other.sum and + self.count == other.count and + self.min == other.min and + self.max == other.max) + + def __neq__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return '<DistributionData({}, {}, {}, {})>'.format(self.sum, + self.count, + self.min, + self.max) + + def get_cumulative(self): + return DistributionData(self.sum, self.count, self.min, self.max) + + def combine(self, other): + if other is None: + return self + else: + new_min = (None if self.min is None and other.min is None else + min(x for x in (self.min, other.min) if x is not None)) + new_max = (None if self.max is None and other.max is None else + max(x for x in (self.max, other.max) if x is not None)) + return DistributionData( + self.sum + other.sum, + self.count + other.count, + new_min, + new_max) + + @classmethod + def singleton(cls, value): + return DistributionData(value, 1, value, value) + + +class MetricAggregator(object): + """Base interface for aggregating metric data during pipeline execution.""" + def zero(self): + raise NotImplementedError + + def combine(self, updates): + raise NotImplementedError + + def result(self, x): + raise NotImplementedError + + +class CounterAggregator(MetricAggregator): + """Aggregator for Counter metric data during pipeline execution. + + Values aggregated should be ``int`` objects. + """ + def zero(self): + return 0 + + def combine(self, x, y): + return int(x) + int(y) + + def result(self, x): + return int(x) + + +class DistributionAggregator(MetricAggregator): + """Aggregator for Distribution metric data during pipeline execution. + + Values aggregated should be ``DistributionData`` objects. + """ + def zero(self): + return DistributionData(0, 0, None, None) + + def combine(self, x, y): + return x.combine(y) + + def result(self, x): + return DistributionResult(x.get_cumulative()) http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/cells_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/cells_test.py b/sdks/python/apache_beam/metrics/cells_test.py new file mode 100644 index 0000000..a4c8a43 --- /dev/null +++ b/sdks/python/apache_beam/metrics/cells_test.py @@ -0,0 +1,143 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import threading +import unittest + +from apache_beam.metrics.cells import CounterCell +from apache_beam.metrics.cells import DistributionCell +from apache_beam.metrics.cells import DistributionData +from apache_beam.metrics.cells import CellCommitState + + +class TestCounterCell(unittest.TestCase): + @classmethod + def _modify_counter(cls, d): + for i in range(cls.NUM_ITERATIONS): + d.inc(i) + + NUM_THREADS = 5 + NUM_ITERATIONS = 100 + + def test_parallel_access(self): + # We create NUM_THREADS threads that concurrently modify the counter. + threads = [] + c = CounterCell() + for _ in range(TestCounterCell.NUM_THREADS): + t = threading.Thread(target=TestCounterCell._modify_counter, + args=(c,)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + total = (self.NUM_ITERATIONS * (self.NUM_ITERATIONS-1)/2 * self.NUM_THREADS) + self.assertEqual(c.get_cumulative(), total) + + def test_basic_operations(self): + c = CounterCell() + c.inc(2) + self.assertEqual(c.get_cumulative(), 2) + + c.dec(10) + self.assertEqual(c.get_cumulative(), -8) + + c.dec() + self.assertEqual(c.get_cumulative(), -9) + + c.inc() + self.assertEqual(c.get_cumulative(), -8) + + +class TestDistributionCell(unittest.TestCase): + @classmethod + def _modify_distribution(cls, d): + for i in range(cls.NUM_ITERATIONS): + d.update(i) + + NUM_THREADS = 5 + NUM_ITERATIONS = 100 + + def test_parallel_access(self): + # We create NUM_THREADS threads that concurrently modify the distribution. + threads = [] + d = DistributionCell() + for _ in range(TestDistributionCell.NUM_THREADS): + t = threading.Thread(target=TestDistributionCell._modify_distribution, + args=(d,)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + total = (self.NUM_ITERATIONS * (self.NUM_ITERATIONS-1)/2 * self.NUM_THREADS) + + count = (self.NUM_ITERATIONS * self.NUM_THREADS) + + self.assertEqual(d.get_cumulative(), + DistributionData(total, count, 0, + self.NUM_ITERATIONS - 1)) + + def test_basic_operations(self): + d = DistributionCell() + d.update(10) + self.assertEqual(d.get_cumulative(), + DistributionData(10, 1, 10, 10)) + + d.update(2) + self.assertEqual(d.get_cumulative(), + DistributionData(12, 2, 2, 10)) + + d.update(900) + self.assertEqual(d.get_cumulative(), + DistributionData(912, 3, 2, 900)) + + def test_integer_only(self): + d = DistributionCell() + d.update(3.1) + d.update(3.2) + d.update(3.3) + self.assertEqual(d.get_cumulative(), + DistributionData(9, 3, 3, 3)) + + +class TestCellCommitState(unittest.TestCase): + def test_basic_path(self): + ds = CellCommitState() + # Starts dirty + self.assertTrue(ds.before_commit()) + ds.after_commit() + self.assertFalse(ds.before_commit()) + + # Make it dirty again + ds.after_modification() + self.assertTrue(ds.before_commit()) + ds.after_commit() + self.assertFalse(ds.before_commit()) + + # Dirty again + ds.after_modification() + self.assertTrue(ds.before_commit()) + ds.after_modification() + ds.after_commit() + self.assertTrue(ds.before_commit()) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/execution.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py new file mode 100644 index 0000000..8f04b7b --- /dev/null +++ b/sdks/python/apache_beam/metrics/execution.py @@ -0,0 +1,210 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Internal classes for Metrics API. + +The classes in this file keep shared state, and organize metrics information. + +Available classes: +- MetricKey - Internal key for a metric. +- MetricResult - Current status of a metric's updates/commits. +- MetricsEnvironment - Keeps track of MetricsContainer and other metrics + information for every single execution working thread. +- MetricsContainer - Holds the metrics of a single step and a single + unit-of-commit (bundle). +""" +from collections import defaultdict +import threading + +from apache_beam.metrics.cells import CounterCell, DistributionCell + + +class MetricKey(object): + """Key used to identify instance of metric cell. + + Metrics are internally keyed by the step name they associated with and + the name of the metric. + """ + def __init__(self, step, metric): + """Initializes ``MetricKey``. + + Args: + step: A string with the step this metric cell is part of. + metric: A ``MetricName`` that identifies a metric. + """ + self.step = step + self.metric = metric + + def __eq__(self, other): + return (self.step == other.step and + self.metric == other.metric) + + def __str__(self): + return 'MetricKey(step={}, metric={})'.format( + self.step, self.metric) + + def __hash__(self): + return hash((self.step, self.metric)) + + +class MetricResult(object): + """Keeps track of the status of a metric within a single bundle. + + It contains the physical and logical updates to the metric. Physical updates + are updates that have not necessarily been committed, but that have been made + during pipeline execution. Logical updates are updates that have been + committed. + + Attributes: + key: A ``MetricKey`` that identifies the metric and bundle of this result. + committed: The committed updates of the metric. This attribute's type is + that of the underlying cell data (e.g. int, DistributionData). + attempted: The logical updates of the metric. This attribute's type is that + of the underlying cell data (e.g. int, DistributionData). + """ + def __init__(self, key, committed, attempted): + """Initializes ``MetricResult``. + Args: + key: A ``MetricKey`` object. + committed: Metric data that has been committed (e.g. logical updates) + attempted: Metric data that has been attempted (e.g. physical updates) + """ + self.key = key + self.committed = committed + self.attempted = attempted + + def __eq__(self, other): + return (self.key == other.key and + self.committed == other.committed and + self.attempted == other.attempted) + + def __str__(self): + return 'MetricResult(key={}, committed={}, attempted={})'.format( + self.key, self.committed, self.attempted) + + +class MetricsEnvironment(object): + """Holds the MetricsContainer for every thread and other metric information. + + This class is not meant to be instantiated, instead being used to keep + track of global state. + """ + METRICS_SUPPORTED = False + _METRICS_SUPPORTED_LOCK = threading.Lock() + + PER_THREAD = threading.local() + + @classmethod + def set_metrics_supported(cls, supported): + with cls._METRICS_SUPPORTED_LOCK: + cls.METRICS_SUPPORTED = supported + + @classmethod + def current_container(cls): + try: + return cls.PER_THREAD.container + except AttributeError: + return None + + @classmethod + def set_current_container(cls, container): + cls.PER_THREAD.container = container + + @classmethod + def unset_current_container(cls): + del cls.PER_THREAD.container + + +class MetricsContainer(object): + """Holds the metrics of a single step and a single bundle.""" + def __init__(self, step_name): + self.step_name = step_name + self.counters = defaultdict(lambda: CounterCell()) + self.distributions = defaultdict(lambda: DistributionCell()) + + def get_counter(self, metric_name): + return self.counters[metric_name] + + def get_distribution(self, metric_name): + return self.distributions[metric_name] + + def _get_updates(self, filter=None): + """Return cumulative values of metrics filtered according to a lambda. + + This returns all the cumulative values for all metrics after filtering + then with the filter parameter lambda function. If None is passed in, + then cumulative values for all metrics are returned. + """ + if filter is None: + filter = lambda v: True + counters = {MetricKey(self.step_name, k): v.get_cumulative() + for k, v in self.counters.items() + if filter(v)} + + distributions = {MetricKey(self.step_name, k): v.get_cumulative() + for k, v in self.distributions.items() + if filter(v)} + + return MetricUpdates(counters, distributions) + + def get_updates(self): + """Return cumulative values of metrics that changed since the last commit. + + This returns all the cumulative values for all metrics only if their state + prior to the function call was COMMITTING or DIRTY. + """ + return self._get_updates(filter=lambda v: v.commit.before_commit()) + + def get_cumulative(self): + """Return MetricUpdates with cumulative values of all metrics in container. + + This returns all the cumulative values for all metrics regardless of whether + they have been committed or not. + """ + return self._get_updates() + + +class ScopedMetricsContainer(object): + def __init__(self, container): + self._old_container = MetricsEnvironment.current_container() + self._container = container + + def __enter__(self): + MetricsEnvironment.set_current_container(self._container) + return self._container + + def __exit__(self, type, value, traceback): + MetricsEnvironment.set_current_container(self._old_container) + + +class MetricUpdates(object): + """Contains updates for several metrics. + + A metric update is an object containing information to update a metric. + For Distribution metrics, it is DistributionData, and for Counter metrics, + it's an int. + """ + def __init__(self, counters=None, distributions=None): + """Create a MetricUpdates object. + + Args: + counters: Dictionary of MetricKey:MetricUpdate updates. + distributions: Dictionary of MetricKey:MetricUpdate objects. + """ + self.counters = counters or {} + self.distributions = distributions or {} http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/execution_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/execution_test.py b/sdks/python/apache_beam/metrics/execution_test.py new file mode 100644 index 0000000..54569c1 --- /dev/null +++ b/sdks/python/apache_beam/metrics/execution_test.py @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from apache_beam.metrics.cells import CellCommitState +from apache_beam.metrics.execution import MetricsContainer +from apache_beam.metrics.execution import ScopedMetricsContainer +from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.metrics.execution import MetricKey +from apache_beam.metrics.metric import Metrics +from apache_beam.metrics.metricbase import MetricName + + +class TestMetricsContainer(unittest.TestCase): + def test_create_new_counter(self): + mc = MetricsContainer('astep') + self.assertFalse(mc.counters.has_key(MetricName('namespace', 'name'))) + mc.get_counter(MetricName('namespace', 'name')) + self.assertTrue(mc.counters.has_key(MetricName('namespace', 'name'))) + + def test_scoped_container(self): + c1 = MetricsContainer('mystep') + c2 = MetricsContainer('myinternalstep') + with ScopedMetricsContainer(c1): + self.assertEqual(c1, MetricsEnvironment.current_container()) + counter = Metrics.counter('ns', 'name') + counter.inc(2) + + with ScopedMetricsContainer(c2): + self.assertEqual(c2, MetricsEnvironment.current_container()) + counter = Metrics.counter('ns', 'name') + counter.inc(3) + self.assertEqual( + c2.get_cumulative().counters.items(), + [(MetricKey('myinternalstep', MetricName('ns', 'name')), 3)]) + + self.assertEqual(c1, MetricsEnvironment.current_container()) + counter = Metrics.counter('ns', 'name') + counter.inc(4) + self.assertEqual( + c1.get_cumulative().counters.items(), + [(MetricKey('mystep', MetricName('ns', 'name')), 6)]) + + def test_add_to_counter(self): + mc = MetricsContainer('astep') + counter = mc.get_counter(MetricName('namespace', 'name')) + counter.inc() + counter = mc.get_counter(MetricName('namespace', 'name')) + self.assertEqual(counter.value, 1) + + def test_get_cumulative_or_updates(self): + mc = MetricsContainer('astep') + + clean_values = [] + dirty_values = [] + for i in range(1, 11): + counter = mc.get_counter(MetricName('namespace', 'name{}'.format(i))) + distribution = mc.get_distribution( + MetricName('namespace', 'name{}'.format(i))) + counter.inc(i) + distribution.update(i) + if i % 2 == 0: + # Some are left to be DIRTY (i.e. not yet committed). + # Some are left to be CLEAN (i.e. already committed). + dirty_values.append(i) + continue + # Assert: Counter/Distribution is DIRTY or COMMITTING (not CLEAN) + self.assertEqual(distribution.commit.before_commit(), True) + self.assertEqual(counter.commit.before_commit(), True) + distribution.commit.after_commit() + counter.commit.after_commit() + # Assert: Counter/Distribution has been committed, therefore it's CLEAN + self.assertEqual(counter.commit.state, CellCommitState.CLEAN) + self.assertEqual(distribution.commit.state, CellCommitState.CLEAN) + clean_values.append(i) + + # Retrieve NON-COMMITTED updates. + logical = mc.get_updates() + self.assertEqual(len(logical.counters), 5) + self.assertEqual(len(logical.distributions), 5) + self.assertEqual(set(dirty_values), + set([v for _, v in logical.counters.items()])) + # Retrieve ALL updates. + cumulative = mc.get_cumulative() + self.assertEqual(len(cumulative.counters), 10) + self.assertEqual(len(cumulative.distributions), 10) + self.assertEqual(set(dirty_values + clean_values), + set([v for _, v in cumulative.counters.items()])) + + +class TestMetricsEnvironment(unittest.TestCase): + def test_uses_right_container(self): + c1 = MetricsContainer('step1') + c2 = MetricsContainer('step2') + counter = Metrics.counter('ns', 'name') + MetricsEnvironment.set_current_container(c1) + counter.inc() + MetricsEnvironment.set_current_container(c2) + counter.inc(3) + MetricsEnvironment.unset_current_container() + + self.assertEqual( + c1.get_cumulative().counters.items(), + [(MetricKey('step1', MetricName('ns', 'name')), 1)]) + + self.assertEqual( + c2.get_cumulative().counters.items(), + [(MetricKey('step2', MetricName('ns', 'name')), 3)]) + + def test_no_container(self): + self.assertEqual(MetricsEnvironment.current_container(), + None) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metric.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py new file mode 100644 index 0000000..13ca77b --- /dev/null +++ b/sdks/python/apache_beam/metrics/metric.py @@ -0,0 +1,165 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +User-facing classes for Metrics API. + +The classes in this file allow users to define and use metrics to be collected +and displayed as part of their pipeline execution. + +- Metrics - This class lets pipeline and transform writers create and access + metric objects such as counters, distributions, etc. +""" +import inspect + +from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.metrics.metricbase import Counter, Distribution +from apache_beam.metrics.metricbase import MetricName + + +class Metrics(object): + """Lets users create/access metric objects during pipeline execution. + """ + @staticmethod + def get_namespace(namespace): + if inspect.isclass(namespace): + return '{}.{}'.format(namespace.__module__, namespace.__name__) + elif isinstance(namespace, str): + return namespace + else: + raise ValueError('Unknown namespace type') + + @staticmethod + def counter(namespace, name): + """Obtains or creates a Counter metric. + + Args: + namespace: A class or string that gives the namespace to a metric + name: A string that gives a unique name to a metric + + Returns: + A Counter object. + """ + namespace = Metrics.get_namespace(namespace) + return Metrics.DelegatingCounter(MetricName(namespace, name)) + + @staticmethod + def distribution(namespace, name): + """Obtains or creates a Distribution metric. + + Distribution metrics are restricted to integer-only distributions. + + Args: + namespace: A class or string that gives the namespace to a metric + name: A string that gives a unique name to a metric + + Returns: + A Distribution object. + """ + namespace = Metrics.get_namespace(namespace) + return Metrics.DelegatingDistribution(MetricName(namespace, name)) + + class DelegatingCounter(Counter): + def __init__(self, metric_name): + self.metric_name = metric_name + + def inc(self, n=1): + container = MetricsEnvironment.current_container() + if container is not None: + container.get_counter(self.metric_name).inc(n) + + class DelegatingDistribution(Distribution): + def __init__(self, metric_name): + self.metric_name = metric_name + + def update(self, value): + container = MetricsEnvironment.current_container() + if container is not None: + container.get_distribution(self.metric_name).update(value) + + +class MetricResults(object): + @staticmethod + def matches(filter, metric_key): + if filter is None: + return True + + if (metric_key.step in filter.steps and + metric_key.metric.namespace in filter.namespaces and + metric_key.metric.name in filter.names): + return True + else: + return False + + def query(self, filter): + raise NotImplementedError + + +class MetricsFilter(object): + """Simple object to filter metrics results. + + If filters by matching a result's step-namespace-name with three internal + sets. No execution/matching logic is added to this object, so that it may + be used to construct arguments as an RPC request. It is left for runners + to implement matching logic by themselves. + """ + def __init__(self): + self._names = set() + self._namespaces = set() + self._steps = set() + + @property + def steps(self): + return frozenset(self._steps) + + @property + def names(self): + return frozenset(self._names) + + @property + def namespaces(self): + return frozenset(self._namespaces) + + def with_name(self, name): + return self.with_names([name]) + + def with_names(self, names): + if isinstance(names, str): + raise ValueError('Names must be an iterable, not a string') + + self._steps.update(names) + return self + + def with_namespace(self, namespace): + return self.with_namespaces([namespace]) + + def with_namespaces(self, namespaces): + if isinstance(namespaces, str): + raise ValueError('Namespaces must be an iterable, not a string') + + self._namespaces.update([Metrics.get_namespace(ns) for ns in namespaces]) + return self + + def with_step(self, step): + return self.with_steps([step]) + + def with_steps(self, steps): + if isinstance(namespaces, str): + raise ValueError('Steps must be an iterable, not a string') + + self._steps.update(steps) + return self http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metric_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py new file mode 100644 index 0000000..c478a85 --- /dev/null +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from apache_beam.metrics.cells import DistributionData +from apache_beam.metrics.execution import MetricKey +from apache_beam.metrics.execution import MetricsContainer +from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.metrics.metric import Metrics +from apache_beam.metrics.metricbase import MetricName + + +class NameTest(unittest.TestCase): + def test_basic_metric_name(self): + name = MetricName('namespace1', 'name1') + self.assertEqual(name.namespace, 'namespace1') + self.assertEqual(name.name, 'name1') + self.assertEqual(name, MetricName('namespace1', 'name1')) + + key = MetricKey('step1', name) + self.assertEqual(key.step, 'step1') + self.assertEqual(key.metric.namespace, 'namespace1') + self.assertEqual(key.metric.name, 'name1') + self.assertEqual(key, MetricKey('step1', MetricName('namespace1', 'name1'))) + + +class MetricsTest(unittest.TestCase): + def test_get_namespace_class(self): + class MyClass(object): + pass + + self.assertEqual('{}.{}'.format(MyClass.__module__, MyClass.__name__), + Metrics.get_namespace(MyClass)) + + def test_get_namespace_string(self): + namespace = 'MyNamespace' + self.assertEqual(namespace, Metrics.get_namespace(namespace)) + + def test_get_namespace_error(self): + with self.assertRaises(ValueError): + Metrics.get_namespace(object()) + + def test_create_counter_distribution(self): + MetricsEnvironment.set_current_container(MetricsContainer('mystep')) + counter_ns = 'aCounterNamespace' + distro_ns = 'aDistributionNamespace' + name = 'aName' + counter = Metrics.counter(counter_ns, name) + distro = Metrics.distribution(distro_ns, name) + counter.inc(10) + counter.dec(3) + distro.update(10) + distro.update(2) + self.assertTrue(isinstance(counter, Metrics.DelegatingCounter)) + self.assertTrue(isinstance(distro, Metrics.DelegatingDistribution)) + + del distro + del counter + + container = MetricsEnvironment.current_container() + self.assertEqual( + container.counters[MetricName(counter_ns, name)].get_cumulative(), + 7) + self.assertEqual( + container.distributions[MetricName(distro_ns, name)].get_cumulative(), + DistributionData(12, 2, 2, 10)) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metricbase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/metricbase.py b/sdks/python/apache_beam/metrics/metricbase.py new file mode 100644 index 0000000..1ad6962 --- /dev/null +++ b/sdks/python/apache_beam/metrics/metricbase.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +The classes in this file are interfaces for metrics. They are not intended +to be subclassed or created directly by users. To work with and access metrics, + users should use the classes and methods exposed in metric.py. + +Available classes: +- Metric - Base interface of a metrics object. +- Counter - Counter metric interface. Allows a count to be incremented or + decremented during pipeline execution. +- Distribution - Distribution Metric interface. Allows statistics about the + distribution of a variable to be collected during pipeline execution. +- MetricName - Namespace and name used to refer to a Metric. +""" + + +class MetricName(object): + """The name of a metric. + + The name of a metric consists of a namespace and a name. The namespace + allows grouping related metrics together and also prevents collisions + between multiple metrics of the same name. + """ + def __init__(self, namespace, name): + """Initializes ``MetricName``. + + Args: + namespace: A string with the namespace of a metric. + name: A string with the name of a metric. + """ + self.namespace = namespace + self.name = name + + def __eq__(self, other): + return (self.namespace == other.namespace and + self.name == other.name) + + def __str__(self): + return 'MetricName(namespace={}, name={})'.format( + self.namespace, self.name) + + def __hash__(self): + return hash((self.namespace, self.name)) + + +class Metric(object): + """Base interface of a metric object.""" + pass + + +class Counter(Metric): + """Counter metric interface. Allows a count to be incremented/decremented + during pipeline execution.""" + def inc(self, n=1): + raise NotImplementedError + + def dec(self, n=1): + self.inc(-n) + + +class Distribution(Metric): + """Distribution Metric interface. Allows statistics about the + distribution of a variable to be collected during pipeline execution.""" + def update(self, value): + raise NotImplementedError http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_metrics.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics.py b/sdks/python/apache_beam/runners/direct/direct_metrics.py new file mode 100644 index 0000000..9d23487 --- /dev/null +++ b/sdks/python/apache_beam/runners/direct/direct_metrics.py @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +DirectRunner implementation of MetricResults. It is in charge not only of +responding to queries of current metrics, but also of keeping the common +state consistent. +""" +from collections import defaultdict +import threading + +from apache_beam.metrics.cells import CounterAggregator +from apache_beam.metrics.cells import DistributionAggregator +from apache_beam.metrics.execution import MetricKey +from apache_beam.metrics.execution import MetricResult +from apache_beam.metrics.metric import MetricResults + + +class DirectMetrics(MetricResults): + def __init__(self): + self._counters = defaultdict( + lambda: DirectMetric(CounterAggregator())) + self._distributions = defaultdict( + lambda: DirectMetric(DistributionAggregator())) + + def _apply_operation(self, bundle, updates, op): + for k, v in updates.counters.items(): + op(self._counters[k], bundle, v) + + for k, v in updates.distributions.items(): + op(self._distributions[k], bundle, v) + + def commit_logical(self, bundle, updates): + op = lambda obj, bundle, update: obj.commit_logical(bundle, update) + self._apply_operation(bundle, updates, op) + + def commit_physical(self, bundle, updates): + op = lambda obj, bundle, update: obj.commit_physical(bundle, update) + self._apply_operation(bundle, updates, op) + + def update_physical(self, bundle, updates): + op = lambda obj, bundle, update: obj.update_physical(bundle, update) + self._apply_operation(bundle, updates, op) + + def query(self, filter=None): + counters = [MetricResult(MetricKey(k.step, k.metric), + v.extract_committed(), + v.extract_latest_attempted()) + for k, v in self._counters.items() + if self.matches(filter, k)] + distributions = [MetricResult(MetricKey(k.step, k.metric), + v.extract_committed(), + v.extract_latest_attempted()) + for k, v in self._distributions.items() + if self.matches(filter, k)] + + return {'counters': counters, + 'distributions': distributions} + + +class DirectMetric(object): + """ Keeps a consistent state for a single metric. + + It keeps track of the metric's physical and logical updates. + It's thread safe. + """ + def __init__(self, aggregator): + self.aggregator = aggregator + self._attempted_lock = threading.Lock() + self.finished_attempted = aggregator.zero() + self.inflight_attempted = {} + self._committed_lock = threading.Lock() + self.finished_committed = aggregator.zero() + + def commit_logical(self, bundle, update): + with self._committed_lock: + self.finished_committed = self.aggregator.combine(update, + self.finished_committed) + + def commit_physical(self, bundle, update): + with self._attempted_lock: + self.inflight_attempted[bundle] = update + self.finished_attempted = self.aggregator.combine(update, + self.finished_attempted) + del self.inflight_attempted[bundle] + + def update_physical(self, bundle, update): + self.inflight_attempted[bundle] = update + + def extract_committed(self): + return self.aggregator.result(self.finished_committed) + + def extract_latest_attempted(self): + res = self.finished_attempted + for _, u in self.inflight_attempted.items(): + res = self.aggregator.combine(res, u) + + return self.aggregator.result(res) http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_metrics_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics_test.py b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py new file mode 100644 index 0000000..256b91f --- /dev/null +++ b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py @@ -0,0 +1,211 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +import hamcrest as hc + +from apache_beam.metrics.metricbase import MetricName +from apache_beam.metrics.execution import MetricUpdates +from apache_beam.metrics.execution import MetricResult +from apache_beam.metrics.execution import MetricKey +from apache_beam.metrics.cells import DistributionData +from apache_beam.metrics.cells import DistributionResult +from apache_beam.runners.direct.direct_metrics import DirectMetrics + + +class DirectMetricsTest(unittest.TestCase): + name1 = MetricName('namespace1', 'name1') + name2 = MetricName('namespace1', 'name2') + name3 = MetricName('namespace2', 'name1') + + bundle1 = object() # For this test, any object can be a bundle + bundle2 = object() + + def test_combiner_functions(self): + metrics = DirectMetrics() + counter = metrics._counters['anykey'] + counter.commit_logical(self.bundle1, 5) + self.assertEqual(counter.extract_committed(), 5) + with self.assertRaises(TypeError): + counter.commit_logical(self.bundle1, None) + + distribution = metrics._distributions['anykey'] + distribution.commit_logical(self.bundle1, DistributionData(4, 1, 4, 4)) + self.assertEqual(distribution.extract_committed(), + DistributionResult(DistributionData(4, 1, 4, 4))) + + with self.assertRaises(AttributeError): + distribution.commit_logical(self.bundle1, None) + + def test_commit_logical_no_filter(self): + metrics = DirectMetrics() + metrics.commit_logical( + self.bundle1, + MetricUpdates( + counters={MetricKey('step1', self.name1): 5, + MetricKey('step1', self.name2): 8}, + distributions={ + MetricKey('step1', self.name1): DistributionData(8, 2, 3, 5)})) + + metrics.commit_logical( + self.bundle1, + MetricUpdates( + counters={MetricKey('step2', self.name1): 7, + MetricKey('step1', self.name2): 4}, + distributions={ + MetricKey('step1', self.name1): DistributionData(4, 1, 4, 4)})) + + results = metrics.query() + hc.assert_that( + results['counters'], + hc.contains_inanyorder(*[ + MetricResult(MetricKey('step1', self.name2), 12, 0), + MetricResult(MetricKey('step2', self.name1), 7, 0), + MetricResult(MetricKey('step1', self.name1), 5, 0)])) + hc.assert_that( + results['distributions'], + hc.contains_inanyorder( + MetricResult(MetricKey('step1', self.name1), + DistributionResult( + DistributionData(12, 3, 3, 5)), + DistributionResult( + DistributionData(0, 0, None, None))))) + + def test_apply_physical_no_filter(self): + metrics = DirectMetrics() + metrics.update_physical(object(), + MetricUpdates( + counters={MetricKey('step1', self.name1): 5, + MetricKey('step1', self.name3): 8})) + + metrics.update_physical(object(), + MetricUpdates( + counters={MetricKey('step2', self.name1): 7, + MetricKey('step1', self.name3): 4})) + results = metrics.query() + hc.assert_that(results['counters'], + hc.contains_inanyorder(*[ + MetricResult(MetricKey('step1', self.name1), 0, 5), + MetricResult(MetricKey('step1', self.name3), 0, 12), + MetricResult(MetricKey('step2', self.name1), 0, 7)])) + + metrics.commit_physical(object(), MetricUpdates()) + results = metrics.query() + hc.assert_that(results['counters'], + hc.contains_inanyorder(*[ + MetricResult(MetricKey('step1', self.name1), 0, 5), + MetricResult(MetricKey('step1', self.name3), 0, 12), + MetricResult(MetricKey('step2', self.name1), 0, 7)])) + + def test_apply_physical_logical(self): + metrics = DirectMetrics() + dist_zero = DistributionData(0, 0, None, None) + metrics.update_physical( + object(), + MetricUpdates( + counters={MetricKey('step1', self.name1): 7, + MetricKey('step1', self.name2): 5, + MetricKey('step2', self.name1): 1}, + distributions={MetricKey('step1', self.name1): + DistributionData(3, 1, 3, 3), + MetricKey('step2', self.name3): + DistributionData(8, 2, 4, 4)})) + results = metrics.query() + hc.assert_that(results['counters'], + hc.contains_inanyorder(*[ + MetricResult(MetricKey('step1', self.name1), 0, 7), + MetricResult(MetricKey('step1', self.name2), 0, 5), + MetricResult(MetricKey('step2', self.name1), 0, 1)])) + hc.assert_that(results['distributions'], + hc.contains_inanyorder(*[ + MetricResult( + MetricKey('step1', self.name1), + DistributionResult(dist_zero), + DistributionResult(DistributionData(3, 1, 3, 3))), + MetricResult( + MetricKey('step2', self.name3), + DistributionResult(dist_zero), + DistributionResult(DistributionData(8, 2, 4, 4)))])) + + metrics.commit_physical( + object(), + MetricUpdates( + counters={MetricKey('step1', self.name1): -3, + MetricKey('step2', self.name1): -5}, + distributions={MetricKey('step1', self.name1): + DistributionData(8, 4, 1, 5), + MetricKey('step2', self.name2): + DistributionData(8, 8, 1, 1)})) + results = metrics.query() + hc.assert_that(results['counters'], + hc.contains_inanyorder(*[ + MetricResult(MetricKey('step1', self.name1), 0, 4), + MetricResult(MetricKey('step1', self.name2), 0, 5), + MetricResult(MetricKey('step2', self.name1), 0, -4)])) + hc.assert_that(results['distributions'], + hc.contains_inanyorder(*[ + MetricResult( + MetricKey('step1', self.name1), + DistributionResult(dist_zero), + DistributionResult(DistributionData(11, 5, 1, 5))), + MetricResult( + MetricKey('step2', self.name3), + DistributionResult(dist_zero), + DistributionResult(DistributionData(8, 2, 4, 4))), + MetricResult( + MetricKey('step2', self.name2), + DistributionResult(dist_zero), + DistributionResult(DistributionData(8, 8, 1, 1)))])) + + metrics.commit_logical( + object(), + MetricUpdates( + counters={MetricKey('step1', self.name1): 3, + MetricKey('step1', self.name2): 5, + MetricKey('step2', self.name1): -3}, + distributions={MetricKey('step1', self.name1): + DistributionData(11, 5, 1, 5), + MetricKey('step2', self.name2): + DistributionData(8, 8, 1, 1), + MetricKey('step2', self.name3): + DistributionData(4, 1, 4, 4)})) + + results = metrics.query() + hc.assert_that(results['counters'], + hc.contains_inanyorder(*[ + MetricResult(MetricKey('step1', self.name1), 3, 4), + MetricResult(MetricKey('step1', self.name2), 5, 5), + MetricResult(MetricKey('step2', self.name1), -3, -4)])) + hc.assert_that(results['distributions'], + hc.contains_inanyorder(*[ + MetricResult( + MetricKey('step1', self.name1), + DistributionResult(DistributionData(11, 5, 1, 5)), + DistributionResult(DistributionData(11, 5, 1, 5))), + MetricResult( + MetricKey('step2', self.name3), + DistributionResult(DistributionData(4, 1, 4, 4)), + DistributionResult(DistributionData(8, 2, 4, 4))), + MetricResult( + MetricKey('step2', self.name2), + DistributionResult(DistributionData(8, 8, 1, 1)), + DistributionResult(DistributionData(8, 8, 1, 1)))])) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 523eb05..a5c616b 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -27,6 +27,7 @@ import collections import logging from apache_beam.runners.direct.bundle_factory import BundleFactory +from apache_beam.metrics.execution import MetricsEnvironment from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState @@ -52,6 +53,7 @@ class DirectRunner(PipelineRunner): from apache_beam.runners.direct.transform_evaluator import \ TransformEvaluatorRegistry + MetricsEnvironment.set_metrics_supported(True) logging.info('Running pipeline with DirectRunner.') self.visitor = ConsumerTrackingPipelineVisitor() pipeline.visit(self.visitor) @@ -152,6 +154,9 @@ class DirectPipelineResult(PipelineResult): def aggregated_values(self, aggregator_or_name): return self._evaluation_context.get_aggregator_values(aggregator_or_name) + def metrics(self): + return self._evaluation_context.metrics() + class EagerRunner(DirectRunner): http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 48d353b..34701f5 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -26,6 +26,7 @@ from apache_beam.transforms import sideinputs from apache_beam.runners.direct.clock import Clock from apache_beam.runners.direct.watermark_manager import WatermarkManager from apache_beam.runners.direct.executor import TransformExecutor +from apache_beam.runners.direct.direct_metrics import DirectMetrics from apache_beam.utils import counters @@ -142,6 +143,7 @@ class EvaluationContext(object): self._pending_unblocked_tasks = [] self._counter_factory = counters.CounterFactory() self._cache = None + self._metrics = DirectMetrics() self._lock = threading.Lock() @@ -149,6 +151,10 @@ class EvaluationContext(object): assert not self._cache self._cache = cache + def metrics(self): + # TODO. Should this be made a @property? + return self._metrics + @property def has_cache(self): return self._cache is not None @@ -187,6 +193,9 @@ class EvaluationContext(object): completed_bundle, result.transform, completed_timers, committed_bundles, result.watermark_hold) + self._metrics.commit_logical(completed_bundle, + result.logical_metric_updates()) + # If the result is for a view, update side inputs container. if (result.output_bundles and result.output_bundles[0].pcollection in self.views): http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 378aecf..7e404f8 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -26,6 +26,9 @@ import threading import traceback from weakref import WeakValueDictionary +from apache_beam.metrics.execution import MetricsContainer +from apache_beam.metrics.execution import MetricsEnvironment + class ExecutorService(object): """Thread pool for executing tasks in parallel.""" @@ -266,6 +269,8 @@ class TransformExecutor(ExecutorService.CallableTask): def __call__(self): self._call_count += 1 assert self._call_count <= (1 + len(self._applied_transform.side_inputs)) + metrics_container = MetricsContainer(self._applied_transform.full_label) + MetricsEnvironment.set_current_container(metrics_container) for side_input in self._applied_transform.side_inputs: if side_input not in self._side_input_values: @@ -290,6 +295,7 @@ class TransformExecutor(ExecutorService.CallableTask): evaluator.process_element(value) result = evaluator.finish_bundle() + result.metric_updates = metrics_container.get_cumulative() if self._evaluation_context.has_cache: for uncommitted_bundle in result.output_bundles: @@ -308,6 +314,9 @@ class TransformExecutor(ExecutorService.CallableTask): logging.warning('Task failed: %s', traceback.format_exc(), exc_info=True) self._completion_callback.handle_exception(e) finally: + self._evaluation_context.metrics().commit_physical( + self._input_bundle, + metrics_container.get_cumulative()) self._transform_evaluation_state.complete(self) http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/transform_result.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py index 298e629..59597dc 100644 --- a/sdks/python/apache_beam/runners/direct/transform_result.py +++ b/sdks/python/apache_beam/runners/direct/transform_result.py @@ -34,6 +34,10 @@ class TransformResult(object): self._watermark_hold = watermark_hold # Only used when caching (materializing) all values is requested. self._undeclared_tag_values = undeclared_tag_values + self.metric_updates = None + + def logical_metric_updates(self): + return self.metric_updates @property def transform(self): http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index ff6a22e..ea86061 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -26,6 +26,8 @@ from datetime import datetime import json import unittest +import hamcrest as hc + import apache_beam as beam from apache_beam.internal import apiclient @@ -38,6 +40,12 @@ import apache_beam.transforms as ptransform from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.metrics.cells import DistributionData +from apache_beam.metrics.cells import DistributionResult +from apache_beam.metrics.execution import MetricResult +from apache_beam.metrics.execution import MetricKey +from apache_beam.metrics.metricbase import MetricName + class RunnerTest(unittest.TestCase): default_properties = [ @@ -135,6 +143,41 @@ class RunnerTest(unittest.TestCase): self.assertEqual(len(disp_data), 3) self.assertEqual(disp_data, expected_data) + def test_direct_runner_metrics(self): + from apache_beam.metrics.metric import Metrics + + class MyDoFn(beam.DoFn): + def process(self, context): + count = Metrics.counter(self.__class__, 'elements') + count.inc() + distro = Metrics.distribution(self.__class__, 'element-dist') + distro.update(context.element) + return [context.element] + + runner = DirectRunner() + p = Pipeline(runner, + options=PipelineOptions(self.default_properties)) + # pylint: disable=expression-not-assigned + (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5]) + | 'do' >> beam.ParDo(MyDoFn())) + result = p.run() + metrics = result.metrics().query() + namespace = '{}.{}'.format(MyDoFn.__module__, + MyDoFn.__name__) + hc.assert_that( + metrics['counters'], + hc.contains_inanyorder( + MetricResult( + MetricKey('do', MetricName(namespace, 'elements')), + 5, 5))) + hc.assert_that( + metrics['distributions'], + hc.contains_inanyorder( + MetricResult( + MetricKey('do', MetricName(namespace, 'element-dist')), + DistributionResult(DistributionData(15, 5, 1, 5)), + DistributionResult(DistributionData(15, 5, 1, 5))))) + def test_no_group_by_key_directly_after_bigquery(self): remote_runner = DataflowRunner() p = Pipeline(remote_runner,