Repository: beam
Updated Branches:
  refs/heads/python-sdk 8bf4c8059 -> d1906416e


[BEAM-147] Adding Metrics API to Python SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/88bad042
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/88bad042
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/88bad042

Branch: refs/heads/python-sdk
Commit: 88bad042a0de8be89786c3e1c909f18b3be088f5
Parents: 8bf4c80
Author: Pablo <pabl...@google.com>
Authored: Tue Nov 22 14:31:26 2016 -0800
Committer: bchambers <bchamb...@google.com>
Committed: Thu Dec 29 16:02:31 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/metrics/__init__.py     |  17 +
 sdks/python/apache_beam/metrics/cells.py        | 315 +++++++++++++++++++
 sdks/python/apache_beam/metrics/cells_test.py   | 143 +++++++++
 sdks/python/apache_beam/metrics/execution.py    | 210 +++++++++++++
 .../apache_beam/metrics/execution_test.py       | 131 ++++++++
 sdks/python/apache_beam/metrics/metric.py       | 165 ++++++++++
 sdks/python/apache_beam/metrics/metric_test.py  |  85 +++++
 sdks/python/apache_beam/metrics/metricbase.py   |  81 +++++
 .../runners/direct/direct_metrics.py            | 112 +++++++
 .../runners/direct/direct_metrics_test.py       | 211 +++++++++++++
 .../apache_beam/runners/direct/direct_runner.py |   5 +
 .../runners/direct/evaluation_context.py        |   9 +
 .../apache_beam/runners/direct/executor.py      |   9 +
 .../runners/direct/transform_result.py          |   4 +
 sdks/python/apache_beam/runners/runner_test.py  |  43 +++
 15 files changed, 1540 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/__init__.py 
b/sdks/python/apache_beam/metrics/__init__.py
new file mode 100644
index 0000000..164d1a8
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from apache_beam.metrics.metric import Metrics

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py 
b/sdks/python/apache_beam/metrics/cells.py
new file mode 100644
index 0000000..5a571f5
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This file contains metric cell classes. A metric cell is used to accumulate
+in-memory changes to a metric. It represents a specific metric in a single
+context.
+
+Cells depend on a 'dirty-bit' in the CellCommitState class that tracks whether
+a cell's updates have been committed.
+"""
+
+import threading
+
+from apache_beam.metrics.metricbase import Counter
+from apache_beam.metrics.metricbase import Distribution
+
+
+class CellCommitState(object):
+  """Atomically tracks a cell's dirty/clean commit status.
+
+  Reporting a metric update works in a two-step process: First, updates to the
+  metric are received, and the metric is marked as 'dirty'. Later, updates are
+  committed, and then the cell may be marked as 'clean'.
+
+  The tracking of a cell's state is done conservatively: A metric may be
+  reported DIRTY even if updates have not occurred.
+
+  This class is thread-safe.
+  """
+
+  # Indicates that there have been changes to the cell since the last commit.
+  DIRTY = 0
+  # Indicates that there have NOT been changes to the cell since last commit.
+  CLEAN = 1
+  # Indicates that a commit of the current value is in progress.
+  COMMITTING = 2
+
+  def __init__(self):
+    """Initializes ``CellCommitState``.
+
+    A cell is initialized as dirty.
+    """
+    self._lock = threading.Lock()
+    self._state = CellCommitState.DIRTY
+
+  @property
+  def state(self):
+    with self._lock:
+      return self._state
+
+  def after_modification(self):
+    """Indicate that changes have been made to the metric being tracked.
+
+    Should be called after modification of the metric value.
+    """
+    with self._lock:
+      self._state = CellCommitState.DIRTY
+
+  def after_commit(self):
+    """Mark changes made up to the last call to ``before_commit`` as committed.
+
+    The next call to ``before_commit`` will return ``False`` unless there have
+    been changes made.
+    """
+    with self._lock:
+      if self._state == CellCommitState.COMMITTING:
+        self._state = CellCommitState.CLEAN
+
+  def before_commit(self):
+    """Check the dirty state, and mark the metric as committing.
+
+    After this call, the state is either CLEAN, or COMMITTING. If the state
+    was already CLEAN, then we simply return. If it was either DIRTY or
+    COMMITTING, then we set the cell as COMMITTING (e.g. in the middle of
+    a commit).
+
+    After a commit is successful, ``after_commit`` should be called.
+
+    Returns:
+      A boolean, which is false if the cell is CLEAN, and true otherwise.
+    """
+    with self._lock:
+      if self._state == CellCommitState.CLEAN:
+        return False
+      else:
+        self._state = CellCommitState.COMMITTING
+        return True
+
+
+class MetricCell(object):
+  """Accumulates in-memory changes to a metric.
+
+  A MetricCell represents a specific metric in a single context and bundle.
+  All subclasses must be thread safe, as these are used in the pipeline 
runners,
+  and may be subject to parallel/concurrent updates. Cells should only be used
+  directly within a runner.
+  """
+  def __init__(self):
+    self.commit = CellCommitState()
+    self._lock = threading.Lock()
+
+  def get_cumulative(self):
+    raise NotImplementedError
+
+
+class CounterCell(Counter, MetricCell):
+  """Tracks the current value and delta of a counter metric.
+
+  Each cell tracks the state of a metric independently per context per bundle.
+  Therefore, each metric has a different cell in each bundle, cells are
+  aggregated by the runner.
+
+  This class is thread safe.
+  """
+  def __init__(self, *args):
+    super(CounterCell, self).__init__(*args)
+    self.value = 0
+
+  def combine(self, other):
+    result = CounterCell()
+    result.inc(self.value + other.value)
+    return result
+
+  def inc(self, n=1):
+    with self._lock:
+      self.value += n
+      self.commit.after_modification()
+
+  def get_cumulative(self):
+    with self._lock:
+      return self.value
+
+
+class DistributionCell(Distribution, MetricCell):
+  """Tracks the current value and delta for a distribution metric.
+
+  Each cell tracks the state of a metric independently per context per bundle.
+  Therefore, each metric has a different cell in each bundle, that is later
+  aggregated.
+
+  This class is thread safe.
+  """
+  def __init__(self, *args):
+    super(DistributionCell, self).__init__(*args)
+    self.data = DistributionData(0, 0, None, None)
+
+  def combine(self, other):
+    result = DistributionCell()
+    result.data = self.data.combine(other.data)
+    return result
+
+  def update(self, value):
+    with self._lock:
+      self.commit.after_modification()
+      self._update(value)
+
+  def _update(self, value):
+    value = int(value)
+    self.data.count += 1
+    self.data.sum += value
+    self.data.min = (value
+                     if self.data.min is None or self.data.min > value
+                     else self.data.min)
+    self.data.max = (value
+                     if self.data.max is None or self.data.max < value
+                     else self.data.max)
+
+  def get_cumulative(self):
+    with self._lock:
+      return self.data.get_cumulative()
+
+
+class DistributionResult(object):
+  """The result of a Distribution metric.
+  """
+  def __init__(self, data):
+    self.data = data
+
+  def __eq__(self, other):
+    return self.data == other.data
+
+  @property
+  def max(self):
+    return self.data.max
+
+  @property
+  def min(self):
+    return self.data.min
+
+  @property
+  def count(self):
+    return self.data.count
+
+  @property
+  def sum(self):
+    return self.data.sum
+
+  @property
+  def mean(self):
+    """Returns the float mean of the distribution.
+
+    If the distribution contains no elements, it returns None.
+    """
+    if self.data.count == 0:
+      return None
+    else:
+      return float(self.data.sum)/self.data.count
+
+
+class DistributionData(object):
+  """The data structure that holds data about a distribution metric.
+
+  Distribution metrics are restricted to distributions of integers only.
+
+  This object is not thread safe, so it's not supposed to be modified
+  by other than the DistributionCell that contains it.
+  """
+  def __init__(self, sum, count, min, max):
+    self.sum = sum
+    self.count = count
+    self.min = min
+    self.max = max
+
+  def __eq__(self, other):
+    return (self.sum == other.sum and
+            self.count == other.count and
+            self.min == other.min and
+            self.max == other.max)
+
+  def __neq__(self, other):
+    return not self.__eq__(other)
+
+  def __repr__(self):
+    return '<DistributionData({}, {}, {}, {})>'.format(self.sum,
+                                                       self.count,
+                                                       self.min,
+                                                       self.max)
+
+  def get_cumulative(self):
+    return DistributionData(self.sum, self.count, self.min, self.max)
+
+  def combine(self, other):
+    if other is None:
+      return self
+    else:
+      new_min = (None if self.min is None and other.min is None else
+                 min(x for x in (self.min, other.min) if x is not None))
+      new_max = (None if self.max is None and other.max is None else
+                 max(x for x in (self.max, other.max) if x is not None))
+      return DistributionData(
+          self.sum + other.sum,
+          self.count + other.count,
+          new_min,
+          new_max)
+
+  @classmethod
+  def singleton(cls, value):
+    return DistributionData(value, 1, value, value)
+
+
+class MetricAggregator(object):
+  """Base interface for aggregating metric data during pipeline execution."""
+  def zero(self):
+    raise NotImplementedError
+
+  def combine(self, updates):
+    raise NotImplementedError
+
+  def result(self, x):
+    raise NotImplementedError
+
+
+class CounterAggregator(MetricAggregator):
+  """Aggregator for Counter metric data during pipeline execution.
+
+  Values aggregated should be ``int`` objects.
+  """
+  def zero(self):
+    return 0
+
+  def combine(self, x, y):
+    return int(x) + int(y)
+
+  def result(self, x):
+    return int(x)
+
+
+class DistributionAggregator(MetricAggregator):
+  """Aggregator for Distribution metric data during pipeline execution.
+
+  Values aggregated should be ``DistributionData`` objects.
+  """
+  def zero(self):
+    return DistributionData(0, 0, None, None)
+
+  def combine(self, x, y):
+    return x.combine(y)
+
+  def result(self, x):
+    return DistributionResult(x.get_cumulative())

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/cells_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells_test.py 
b/sdks/python/apache_beam/metrics/cells_test.py
new file mode 100644
index 0000000..a4c8a43
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/cells_test.py
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+import unittest
+
+from apache_beam.metrics.cells import CounterCell
+from apache_beam.metrics.cells import DistributionCell
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import CellCommitState
+
+
+class TestCounterCell(unittest.TestCase):
+  @classmethod
+  def _modify_counter(cls, d):
+    for i in range(cls.NUM_ITERATIONS):
+      d.inc(i)
+
+  NUM_THREADS = 5
+  NUM_ITERATIONS = 100
+
+  def test_parallel_access(self):
+    # We create NUM_THREADS threads that concurrently modify the counter.
+    threads = []
+    c = CounterCell()
+    for _ in range(TestCounterCell.NUM_THREADS):
+      t = threading.Thread(target=TestCounterCell._modify_counter,
+                           args=(c,))
+      threads.append(t)
+      t.start()
+
+    for t in threads:
+      t.join()
+
+    total = (self.NUM_ITERATIONS * (self.NUM_ITERATIONS-1)/2 * 
self.NUM_THREADS)
+    self.assertEqual(c.get_cumulative(), total)
+
+  def test_basic_operations(self):
+    c = CounterCell()
+    c.inc(2)
+    self.assertEqual(c.get_cumulative(), 2)
+
+    c.dec(10)
+    self.assertEqual(c.get_cumulative(), -8)
+
+    c.dec()
+    self.assertEqual(c.get_cumulative(), -9)
+
+    c.inc()
+    self.assertEqual(c.get_cumulative(), -8)
+
+
+class TestDistributionCell(unittest.TestCase):
+  @classmethod
+  def _modify_distribution(cls, d):
+    for i in range(cls.NUM_ITERATIONS):
+      d.update(i)
+
+  NUM_THREADS = 5
+  NUM_ITERATIONS = 100
+
+  def test_parallel_access(self):
+    # We create NUM_THREADS threads that concurrently modify the distribution.
+    threads = []
+    d = DistributionCell()
+    for _ in range(TestDistributionCell.NUM_THREADS):
+      t = threading.Thread(target=TestDistributionCell._modify_distribution,
+                           args=(d,))
+      threads.append(t)
+      t.start()
+
+    for t in threads:
+      t.join()
+
+    total = (self.NUM_ITERATIONS * (self.NUM_ITERATIONS-1)/2 * 
self.NUM_THREADS)
+
+    count = (self.NUM_ITERATIONS * self.NUM_THREADS)
+
+    self.assertEqual(d.get_cumulative(),
+                     DistributionData(total, count, 0,
+                                      self.NUM_ITERATIONS - 1))
+
+  def test_basic_operations(self):
+    d = DistributionCell()
+    d.update(10)
+    self.assertEqual(d.get_cumulative(),
+                     DistributionData(10, 1, 10, 10))
+
+    d.update(2)
+    self.assertEqual(d.get_cumulative(),
+                     DistributionData(12, 2, 2, 10))
+
+    d.update(900)
+    self.assertEqual(d.get_cumulative(),
+                     DistributionData(912, 3, 2, 900))
+
+  def test_integer_only(self):
+    d = DistributionCell()
+    d.update(3.1)
+    d.update(3.2)
+    d.update(3.3)
+    self.assertEqual(d.get_cumulative(),
+                     DistributionData(9, 3, 3, 3))
+
+
+class TestCellCommitState(unittest.TestCase):
+  def test_basic_path(self):
+    ds = CellCommitState()
+    # Starts dirty
+    self.assertTrue(ds.before_commit())
+    ds.after_commit()
+    self.assertFalse(ds.before_commit())
+
+    # Make it dirty again
+    ds.after_modification()
+    self.assertTrue(ds.before_commit())
+    ds.after_commit()
+    self.assertFalse(ds.before_commit())
+
+    # Dirty again
+    ds.after_modification()
+    self.assertTrue(ds.before_commit())
+    ds.after_modification()
+    ds.after_commit()
+    self.assertTrue(ds.before_commit())
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py 
b/sdks/python/apache_beam/metrics/execution.py
new file mode 100644
index 0000000..8f04b7b
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -0,0 +1,210 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Internal classes for Metrics API.
+
+The classes in this file keep shared state, and organize metrics information.
+
+Available classes:
+- MetricKey - Internal key for a metric.
+- MetricResult - Current status of a metric's updates/commits.
+- MetricsEnvironment - Keeps track of MetricsContainer and other metrics
+    information for every single execution working thread.
+- MetricsContainer - Holds the metrics of a single step and a single
+    unit-of-commit (bundle).
+"""
+from collections import defaultdict
+import threading
+
+from apache_beam.metrics.cells import CounterCell, DistributionCell
+
+
+class MetricKey(object):
+  """Key used to identify instance of metric cell.
+
+  Metrics are internally keyed by the step name they associated with and
+  the name of the metric.
+  """
+  def __init__(self, step, metric):
+    """Initializes ``MetricKey``.
+
+    Args:
+      step: A string with the step this metric cell is part of.
+      metric: A ``MetricName`` that identifies a metric.
+    """
+    self.step = step
+    self.metric = metric
+
+  def __eq__(self, other):
+    return (self.step == other.step and
+            self.metric == other.metric)
+
+  def __str__(self):
+    return 'MetricKey(step={}, metric={})'.format(
+        self.step, self.metric)
+
+  def __hash__(self):
+    return hash((self.step, self.metric))
+
+
+class MetricResult(object):
+  """Keeps track of the status of a metric within a single bundle.
+
+  It contains the physical and logical updates to the metric. Physical updates
+  are updates that have not necessarily been committed, but that have been made
+  during pipeline execution. Logical updates are updates that have been
+  committed.
+
+  Attributes:
+    key: A ``MetricKey`` that identifies the metric and bundle of this result.
+    committed: The committed updates of the metric. This attribute's type is
+      that of the underlying cell data (e.g. int, DistributionData).
+    attempted: The logical updates of the metric. This attribute's type is that
+      of the underlying cell data (e.g. int, DistributionData).
+  """
+  def __init__(self, key, committed, attempted):
+    """Initializes ``MetricResult``.
+    Args:
+      key: A ``MetricKey`` object.
+      committed: Metric data that has been committed (e.g. logical updates)
+      attempted: Metric data that has been attempted (e.g. physical updates)
+    """
+    self.key = key
+    self.committed = committed
+    self.attempted = attempted
+
+  def __eq__(self, other):
+    return (self.key == other.key and
+            self.committed == other.committed and
+            self.attempted == other.attempted)
+
+  def __str__(self):
+    return 'MetricResult(key={}, committed={}, attempted={})'.format(
+        self.key, self.committed, self.attempted)
+
+
+class MetricsEnvironment(object):
+  """Holds the MetricsContainer for every thread and other metric information.
+
+  This class is not meant to be instantiated, instead being used to keep
+  track of global state.
+  """
+  METRICS_SUPPORTED = False
+  _METRICS_SUPPORTED_LOCK = threading.Lock()
+
+  PER_THREAD = threading.local()
+
+  @classmethod
+  def set_metrics_supported(cls, supported):
+    with cls._METRICS_SUPPORTED_LOCK:
+      cls.METRICS_SUPPORTED = supported
+
+  @classmethod
+  def current_container(cls):
+    try:
+      return cls.PER_THREAD.container
+    except AttributeError:
+      return None
+
+  @classmethod
+  def set_current_container(cls, container):
+    cls.PER_THREAD.container = container
+
+  @classmethod
+  def unset_current_container(cls):
+    del cls.PER_THREAD.container
+
+
+class MetricsContainer(object):
+  """Holds the metrics of a single step and a single bundle."""
+  def __init__(self, step_name):
+    self.step_name = step_name
+    self.counters = defaultdict(lambda: CounterCell())
+    self.distributions = defaultdict(lambda: DistributionCell())
+
+  def get_counter(self, metric_name):
+    return self.counters[metric_name]
+
+  def get_distribution(self, metric_name):
+    return self.distributions[metric_name]
+
+  def _get_updates(self, filter=None):
+    """Return cumulative values of metrics filtered according to a lambda.
+
+    This returns all the cumulative values for all metrics after filtering
+    then with the filter parameter lambda function. If None is passed in,
+    then cumulative values for all metrics are returned.
+    """
+    if filter is None:
+      filter = lambda v: True
+    counters = {MetricKey(self.step_name, k): v.get_cumulative()
+                for k, v in self.counters.items()
+                if filter(v)}
+
+    distributions = {MetricKey(self.step_name, k): v.get_cumulative()
+                     for k, v in self.distributions.items()
+                     if filter(v)}
+
+    return MetricUpdates(counters, distributions)
+
+  def get_updates(self):
+    """Return cumulative values of metrics that changed since the last commit.
+
+    This returns all the cumulative values for all metrics only if their state
+    prior to the function call was COMMITTING or DIRTY.
+    """
+    return self._get_updates(filter=lambda v: v.commit.before_commit())
+
+  def get_cumulative(self):
+    """Return MetricUpdates with cumulative values of all metrics in container.
+
+    This returns all the cumulative values for all metrics regardless of 
whether
+    they have been committed or not.
+    """
+    return self._get_updates()
+
+
+class ScopedMetricsContainer(object):
+  def __init__(self, container):
+    self._old_container = MetricsEnvironment.current_container()
+    self._container = container
+
+  def __enter__(self):
+    MetricsEnvironment.set_current_container(self._container)
+    return self._container
+
+  def __exit__(self, type, value, traceback):
+    MetricsEnvironment.set_current_container(self._old_container)
+
+
+class MetricUpdates(object):
+  """Contains updates for several metrics.
+
+  A metric update is an object containing information to update a metric.
+  For Distribution metrics, it is DistributionData, and for Counter metrics,
+  it's an int.
+  """
+  def __init__(self, counters=None, distributions=None):
+    """Create a MetricUpdates object.
+
+    Args:
+      counters: Dictionary of MetricKey:MetricUpdate updates.
+      distributions: Dictionary of MetricKey:MetricUpdate objects.
+    """
+    self.counters = counters or {}
+    self.distributions = distributions or {}

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/execution_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution_test.py 
b/sdks/python/apache_beam/metrics/execution_test.py
new file mode 100644
index 0000000..54569c1
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/execution_test.py
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from apache_beam.metrics.cells import CellCommitState
+from apache_beam.metrics.execution import MetricsContainer
+from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.metric import Metrics
+from apache_beam.metrics.metricbase import MetricName
+
+
+class TestMetricsContainer(unittest.TestCase):
+  def test_create_new_counter(self):
+    mc = MetricsContainer('astep')
+    self.assertFalse(mc.counters.has_key(MetricName('namespace', 'name')))
+    mc.get_counter(MetricName('namespace', 'name'))
+    self.assertTrue(mc.counters.has_key(MetricName('namespace', 'name')))
+
+  def test_scoped_container(self):
+    c1 = MetricsContainer('mystep')
+    c2 = MetricsContainer('myinternalstep')
+    with ScopedMetricsContainer(c1):
+      self.assertEqual(c1, MetricsEnvironment.current_container())
+      counter = Metrics.counter('ns', 'name')
+      counter.inc(2)
+
+      with ScopedMetricsContainer(c2):
+        self.assertEqual(c2, MetricsEnvironment.current_container())
+        counter = Metrics.counter('ns', 'name')
+        counter.inc(3)
+        self.assertEqual(
+            c2.get_cumulative().counters.items(),
+            [(MetricKey('myinternalstep', MetricName('ns', 'name')), 3)])
+
+      self.assertEqual(c1, MetricsEnvironment.current_container())
+      counter = Metrics.counter('ns', 'name')
+      counter.inc(4)
+      self.assertEqual(
+          c1.get_cumulative().counters.items(),
+          [(MetricKey('mystep', MetricName('ns', 'name')), 6)])
+
+  def test_add_to_counter(self):
+    mc = MetricsContainer('astep')
+    counter = mc.get_counter(MetricName('namespace', 'name'))
+    counter.inc()
+    counter = mc.get_counter(MetricName('namespace', 'name'))
+    self.assertEqual(counter.value, 1)
+
+  def test_get_cumulative_or_updates(self):
+    mc = MetricsContainer('astep')
+
+    clean_values = []
+    dirty_values = []
+    for i in range(1, 11):
+      counter = mc.get_counter(MetricName('namespace', 'name{}'.format(i)))
+      distribution = mc.get_distribution(
+          MetricName('namespace', 'name{}'.format(i)))
+      counter.inc(i)
+      distribution.update(i)
+      if i % 2 == 0:
+        # Some are left to be DIRTY (i.e. not yet committed).
+        # Some are left to be CLEAN (i.e. already committed).
+        dirty_values.append(i)
+        continue
+      # Assert: Counter/Distribution is DIRTY or COMMITTING (not CLEAN)
+      self.assertEqual(distribution.commit.before_commit(), True)
+      self.assertEqual(counter.commit.before_commit(), True)
+      distribution.commit.after_commit()
+      counter.commit.after_commit()
+      # Assert: Counter/Distribution has been committed, therefore it's CLEAN
+      self.assertEqual(counter.commit.state, CellCommitState.CLEAN)
+      self.assertEqual(distribution.commit.state, CellCommitState.CLEAN)
+      clean_values.append(i)
+
+    # Retrieve NON-COMMITTED updates.
+    logical = mc.get_updates()
+    self.assertEqual(len(logical.counters), 5)
+    self.assertEqual(len(logical.distributions), 5)
+    self.assertEqual(set(dirty_values),
+                     set([v for _, v in logical.counters.items()]))
+    # Retrieve ALL updates.
+    cumulative = mc.get_cumulative()
+    self.assertEqual(len(cumulative.counters), 10)
+    self.assertEqual(len(cumulative.distributions), 10)
+    self.assertEqual(set(dirty_values + clean_values),
+                     set([v for _, v in cumulative.counters.items()]))
+
+
+class TestMetricsEnvironment(unittest.TestCase):
+  def test_uses_right_container(self):
+    c1 = MetricsContainer('step1')
+    c2 = MetricsContainer('step2')
+    counter = Metrics.counter('ns', 'name')
+    MetricsEnvironment.set_current_container(c1)
+    counter.inc()
+    MetricsEnvironment.set_current_container(c2)
+    counter.inc(3)
+    MetricsEnvironment.unset_current_container()
+
+    self.assertEqual(
+        c1.get_cumulative().counters.items(),
+        [(MetricKey('step1', MetricName('ns', 'name')), 1)])
+
+    self.assertEqual(
+        c2.get_cumulative().counters.items(),
+        [(MetricKey('step2', MetricName('ns', 'name')), 3)])
+
+  def test_no_container(self):
+    self.assertEqual(MetricsEnvironment.current_container(),
+                     None)
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py 
b/sdks/python/apache_beam/metrics/metric.py
new file mode 100644
index 0000000..13ca77b
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -0,0 +1,165 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+User-facing classes for Metrics API.
+
+The classes in this file allow users to define and use metrics to be collected
+and displayed as part of their pipeline execution.
+
+- Metrics - This class lets pipeline and transform writers create and access
+    metric objects such as counters, distributions, etc.
+"""
+import inspect
+
+from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.metricbase import Counter, Distribution
+from apache_beam.metrics.metricbase import MetricName
+
+
+class Metrics(object):
+  """Lets users create/access metric objects during pipeline execution.
+  """
+  @staticmethod
+  def get_namespace(namespace):
+    if inspect.isclass(namespace):
+      return '{}.{}'.format(namespace.__module__, namespace.__name__)
+    elif isinstance(namespace, str):
+      return namespace
+    else:
+      raise ValueError('Unknown namespace type')
+
+  @staticmethod
+  def counter(namespace, name):
+    """Obtains or creates a Counter metric.
+
+    Args:
+      namespace: A class or string that gives the namespace to a metric
+      name: A string that gives a unique name to a metric
+
+    Returns:
+      A Counter object.
+    """
+    namespace = Metrics.get_namespace(namespace)
+    return Metrics.DelegatingCounter(MetricName(namespace, name))
+
+  @staticmethod
+  def distribution(namespace, name):
+    """Obtains or creates a Distribution metric.
+
+    Distribution metrics are restricted to integer-only distributions.
+
+    Args:
+      namespace: A class or string that gives the namespace to a metric
+      name: A string that gives a unique name to a metric
+
+    Returns:
+      A Distribution object.
+    """
+    namespace = Metrics.get_namespace(namespace)
+    return Metrics.DelegatingDistribution(MetricName(namespace, name))
+
+  class DelegatingCounter(Counter):
+    def __init__(self, metric_name):
+      self.metric_name = metric_name
+
+    def inc(self, n=1):
+      container = MetricsEnvironment.current_container()
+      if container is not None:
+        container.get_counter(self.metric_name).inc(n)
+
+  class DelegatingDistribution(Distribution):
+    def __init__(self, metric_name):
+      self.metric_name = metric_name
+
+    def update(self, value):
+      container = MetricsEnvironment.current_container()
+      if container is not None:
+        container.get_distribution(self.metric_name).update(value)
+
+
+class MetricResults(object):
+  @staticmethod
+  def matches(filter, metric_key):
+    if filter is None:
+      return True
+
+    if (metric_key.step in filter.steps and
+        metric_key.metric.namespace in filter.namespaces and
+        metric_key.metric.name in filter.names):
+      return True
+    else:
+      return False
+
+  def query(self, filter):
+    raise NotImplementedError
+
+
+class MetricsFilter(object):
+  """Simple object to filter metrics results.
+
+  If filters by matching a result's step-namespace-name with three internal
+  sets. No execution/matching logic is added to this object, so that it may
+  be used to construct arguments as an RPC request. It is left for runners
+  to implement matching logic by themselves.
+  """
+  def __init__(self):
+    self._names = set()
+    self._namespaces = set()
+    self._steps = set()
+
+  @property
+  def steps(self):
+    return frozenset(self._steps)
+
+  @property
+  def names(self):
+    return frozenset(self._names)
+
+  @property
+  def namespaces(self):
+    return frozenset(self._namespaces)
+
+  def with_name(self, name):
+    return self.with_names([name])
+
+  def with_names(self, names):
+    if isinstance(names, str):
+      raise ValueError('Names must be an iterable, not a string')
+
+    self._steps.update(names)
+    return self
+
+  def with_namespace(self, namespace):
+    return self.with_namespaces([namespace])
+
+  def with_namespaces(self, namespaces):
+    if isinstance(namespaces, str):
+      raise ValueError('Namespaces must be an iterable, not a string')
+
+    self._namespaces.update([Metrics.get_namespace(ns) for ns in namespaces])
+    return self
+
+  def with_step(self, step):
+    return self.with_steps([step])
+
+  def with_steps(self, steps):
+    if isinstance(namespaces, str):
+      raise ValueError('Steps must be an iterable, not a string')
+
+    self._steps.update(steps)
+    return self

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metric_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric_test.py 
b/sdks/python/apache_beam/metrics/metric_test.py
new file mode 100644
index 0000000..c478a85
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.execution import MetricsContainer
+from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.metric import Metrics
+from apache_beam.metrics.metricbase import MetricName
+
+
+class NameTest(unittest.TestCase):
+  def test_basic_metric_name(self):
+    name = MetricName('namespace1', 'name1')
+    self.assertEqual(name.namespace, 'namespace1')
+    self.assertEqual(name.name, 'name1')
+    self.assertEqual(name, MetricName('namespace1', 'name1'))
+
+    key = MetricKey('step1', name)
+    self.assertEqual(key.step, 'step1')
+    self.assertEqual(key.metric.namespace, 'namespace1')
+    self.assertEqual(key.metric.name, 'name1')
+    self.assertEqual(key, MetricKey('step1', MetricName('namespace1', 
'name1')))
+
+
+class MetricsTest(unittest.TestCase):
+  def test_get_namespace_class(self):
+    class MyClass(object):
+      pass
+
+    self.assertEqual('{}.{}'.format(MyClass.__module__, MyClass.__name__),
+                     Metrics.get_namespace(MyClass))
+
+  def test_get_namespace_string(self):
+    namespace = 'MyNamespace'
+    self.assertEqual(namespace, Metrics.get_namespace(namespace))
+
+  def test_get_namespace_error(self):
+    with self.assertRaises(ValueError):
+      Metrics.get_namespace(object())
+
+  def test_create_counter_distribution(self):
+    MetricsEnvironment.set_current_container(MetricsContainer('mystep'))
+    counter_ns = 'aCounterNamespace'
+    distro_ns = 'aDistributionNamespace'
+    name = 'aName'
+    counter = Metrics.counter(counter_ns, name)
+    distro = Metrics.distribution(distro_ns, name)
+    counter.inc(10)
+    counter.dec(3)
+    distro.update(10)
+    distro.update(2)
+    self.assertTrue(isinstance(counter, Metrics.DelegatingCounter))
+    self.assertTrue(isinstance(distro, Metrics.DelegatingDistribution))
+
+    del distro
+    del counter
+
+    container = MetricsEnvironment.current_container()
+    self.assertEqual(
+        container.counters[MetricName(counter_ns, name)].get_cumulative(),
+        7)
+    self.assertEqual(
+        container.distributions[MetricName(distro_ns, name)].get_cumulative(),
+        DistributionData(12, 2, 2, 10))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metricbase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metricbase.py 
b/sdks/python/apache_beam/metrics/metricbase.py
new file mode 100644
index 0000000..1ad6962
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/metricbase.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The classes in this file are interfaces for metrics. They are not intended
+to be subclassed or created directly by users. To work with and access metrics,
+ users should use the classes and methods exposed in metric.py.
+
+Available classes:
+- Metric - Base interface of a metrics object.
+- Counter - Counter metric interface. Allows a count to be incremented or
+    decremented during pipeline execution.
+- Distribution - Distribution Metric interface. Allows statistics about the
+    distribution of a variable to be collected during pipeline execution.
+- MetricName - Namespace and name used to refer to a Metric.
+"""
+
+
+class MetricName(object):
+  """The name of a metric.
+
+  The name of a metric consists of a namespace and a name. The namespace
+  allows grouping related metrics together and also prevents collisions
+  between multiple metrics of the same name.
+  """
+  def __init__(self, namespace, name):
+    """Initializes ``MetricName``.
+
+    Args:
+      namespace: A string with the namespace of a metric.
+      name: A string with the name of a metric.
+    """
+    self.namespace = namespace
+    self.name = name
+
+  def __eq__(self, other):
+    return (self.namespace == other.namespace and
+            self.name == other.name)
+
+  def __str__(self):
+    return 'MetricName(namespace={}, name={})'.format(
+        self.namespace, self.name)
+
+  def __hash__(self):
+    return hash((self.namespace, self.name))
+
+
+class Metric(object):
+  """Base interface of a metric object."""
+  pass
+
+
+class Counter(Metric):
+  """Counter metric interface. Allows a count to be incremented/decremented
+  during pipeline execution."""
+  def inc(self, n=1):
+    raise NotImplementedError
+
+  def dec(self, n=1):
+    self.inc(-n)
+
+
+class Distribution(Metric):
+  """Distribution Metric interface. Allows statistics about the
+    distribution of a variable to be collected during pipeline execution."""
+  def update(self, value):
+    raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics.py 
b/sdks/python/apache_beam/runners/direct/direct_metrics.py
new file mode 100644
index 0000000..9d23487
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/direct_metrics.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+DirectRunner implementation of MetricResults. It is in charge not only of
+responding to queries of current metrics, but also of keeping the common
+state consistent.
+"""
+from collections import defaultdict
+import threading
+
+from apache_beam.metrics.cells import CounterAggregator
+from apache_beam.metrics.cells import DistributionAggregator
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.execution import MetricResult
+from apache_beam.metrics.metric import MetricResults
+
+
+class DirectMetrics(MetricResults):
+  def __init__(self):
+    self._counters = defaultdict(
+        lambda: DirectMetric(CounterAggregator()))
+    self._distributions = defaultdict(
+        lambda: DirectMetric(DistributionAggregator()))
+
+  def _apply_operation(self, bundle, updates, op):
+    for k, v in updates.counters.items():
+      op(self._counters[k], bundle, v)
+
+    for k, v in updates.distributions.items():
+      op(self._distributions[k], bundle, v)
+
+  def commit_logical(self, bundle, updates):
+    op = lambda obj, bundle, update: obj.commit_logical(bundle, update)
+    self._apply_operation(bundle, updates, op)
+
+  def commit_physical(self, bundle, updates):
+    op = lambda obj, bundle, update: obj.commit_physical(bundle, update)
+    self._apply_operation(bundle, updates, op)
+
+  def update_physical(self, bundle, updates):
+    op = lambda obj, bundle, update: obj.update_physical(bundle, update)
+    self._apply_operation(bundle, updates, op)
+
+  def query(self, filter=None):
+    counters = [MetricResult(MetricKey(k.step, k.metric),
+                             v.extract_committed(),
+                             v.extract_latest_attempted())
+                for k, v in self._counters.items()
+                if self.matches(filter, k)]
+    distributions = [MetricResult(MetricKey(k.step, k.metric),
+                                  v.extract_committed(),
+                                  v.extract_latest_attempted())
+                     for k, v in self._distributions.items()
+                     if self.matches(filter, k)]
+
+    return {'counters': counters,
+            'distributions': distributions}
+
+
+class DirectMetric(object):
+  """ Keeps a consistent state for a single metric.
+
+  It keeps track of the metric's physical and logical updates.
+  It's thread safe.
+  """
+  def __init__(self, aggregator):
+    self.aggregator = aggregator
+    self._attempted_lock = threading.Lock()
+    self.finished_attempted = aggregator.zero()
+    self.inflight_attempted = {}
+    self._committed_lock = threading.Lock()
+    self.finished_committed = aggregator.zero()
+
+  def commit_logical(self, bundle, update):
+    with self._committed_lock:
+      self.finished_committed = self.aggregator.combine(update,
+                                                        
self.finished_committed)
+
+  def commit_physical(self, bundle, update):
+    with self._attempted_lock:
+      self.inflight_attempted[bundle] = update
+      self.finished_attempted = self.aggregator.combine(update,
+                                                        
self.finished_attempted)
+      del self.inflight_attempted[bundle]
+
+  def update_physical(self, bundle, update):
+    self.inflight_attempted[bundle] = update
+
+  def extract_committed(self):
+    return self.aggregator.result(self.finished_committed)
+
+  def extract_latest_attempted(self):
+    res = self.finished_attempted
+    for _, u in self.inflight_attempted.items():
+      res = self.aggregator.combine(res, u)
+
+    return self.aggregator.result(res)

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics_test.py 
b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
new file mode 100644
index 0000000..256b91f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+import hamcrest as hc
+
+from apache_beam.metrics.metricbase import MetricName
+from apache_beam.metrics.execution import MetricUpdates
+from apache_beam.metrics.execution import MetricResult
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
+from apache_beam.runners.direct.direct_metrics import DirectMetrics
+
+
+class DirectMetricsTest(unittest.TestCase):
+  name1 = MetricName('namespace1', 'name1')
+  name2 = MetricName('namespace1', 'name2')
+  name3 = MetricName('namespace2', 'name1')
+
+  bundle1 = object() # For this test, any object can be a bundle
+  bundle2 = object()
+
+  def test_combiner_functions(self):
+    metrics = DirectMetrics()
+    counter = metrics._counters['anykey']
+    counter.commit_logical(self.bundle1, 5)
+    self.assertEqual(counter.extract_committed(), 5)
+    with self.assertRaises(TypeError):
+      counter.commit_logical(self.bundle1, None)
+
+    distribution = metrics._distributions['anykey']
+    distribution.commit_logical(self.bundle1, DistributionData(4, 1, 4, 4))
+    self.assertEqual(distribution.extract_committed(),
+                     DistributionResult(DistributionData(4, 1, 4, 4)))
+
+    with self.assertRaises(AttributeError):
+      distribution.commit_logical(self.bundle1, None)
+
+  def test_commit_logical_no_filter(self):
+    metrics = DirectMetrics()
+    metrics.commit_logical(
+        self.bundle1,
+        MetricUpdates(
+            counters={MetricKey('step1', self.name1): 5,
+                      MetricKey('step1', self.name2): 8},
+            distributions={
+                MetricKey('step1', self.name1): DistributionData(8, 2, 3, 5)}))
+
+    metrics.commit_logical(
+        self.bundle1,
+        MetricUpdates(
+            counters={MetricKey('step2', self.name1): 7,
+                      MetricKey('step1', self.name2): 4},
+            distributions={
+                MetricKey('step1', self.name1): DistributionData(4, 1, 4, 4)}))
+
+    results = metrics.query()
+    hc.assert_that(
+        results['counters'],
+        hc.contains_inanyorder(*[
+            MetricResult(MetricKey('step1', self.name2), 12, 0),
+            MetricResult(MetricKey('step2', self.name1), 7, 0),
+            MetricResult(MetricKey('step1', self.name1), 5, 0)]))
+    hc.assert_that(
+        results['distributions'],
+        hc.contains_inanyorder(
+            MetricResult(MetricKey('step1', self.name1),
+                         DistributionResult(
+                             DistributionData(12, 3, 3, 5)),
+                         DistributionResult(
+                             DistributionData(0, 0, None, None)))))
+
+  def test_apply_physical_no_filter(self):
+    metrics = DirectMetrics()
+    metrics.update_physical(object(),
+                            MetricUpdates(
+                                counters={MetricKey('step1', self.name1): 5,
+                                          MetricKey('step1', self.name3): 8}))
+
+    metrics.update_physical(object(),
+                            MetricUpdates(
+                                counters={MetricKey('step2', self.name1): 7,
+                                          MetricKey('step1', self.name3): 4}))
+    results = metrics.query()
+    hc.assert_that(results['counters'],
+                   hc.contains_inanyorder(*[
+                       MetricResult(MetricKey('step1', self.name1), 0, 5),
+                       MetricResult(MetricKey('step1', self.name3), 0, 12),
+                       MetricResult(MetricKey('step2', self.name1), 0, 7)]))
+
+    metrics.commit_physical(object(), MetricUpdates())
+    results = metrics.query()
+    hc.assert_that(results['counters'],
+                   hc.contains_inanyorder(*[
+                       MetricResult(MetricKey('step1', self.name1), 0, 5),
+                       MetricResult(MetricKey('step1', self.name3), 0, 12),
+                       MetricResult(MetricKey('step2', self.name1), 0, 7)]))
+
+  def test_apply_physical_logical(self):
+    metrics = DirectMetrics()
+    dist_zero = DistributionData(0, 0, None, None)
+    metrics.update_physical(
+        object(),
+        MetricUpdates(
+            counters={MetricKey('step1', self.name1): 7,
+                      MetricKey('step1', self.name2): 5,
+                      MetricKey('step2', self.name1): 1},
+            distributions={MetricKey('step1', self.name1):
+                           DistributionData(3, 1, 3, 3),
+                           MetricKey('step2', self.name3):
+                           DistributionData(8, 2, 4, 4)}))
+    results = metrics.query()
+    hc.assert_that(results['counters'],
+                   hc.contains_inanyorder(*[
+                       MetricResult(MetricKey('step1', self.name1), 0, 7),
+                       MetricResult(MetricKey('step1', self.name2), 0, 5),
+                       MetricResult(MetricKey('step2', self.name1), 0, 1)]))
+    hc.assert_that(results['distributions'],
+                   hc.contains_inanyorder(*[
+                       MetricResult(
+                           MetricKey('step1', self.name1),
+                           DistributionResult(dist_zero),
+                           DistributionResult(DistributionData(3, 1, 3, 3))),
+                       MetricResult(
+                           MetricKey('step2', self.name3),
+                           DistributionResult(dist_zero),
+                           DistributionResult(DistributionData(8, 2, 4, 4)))]))
+
+    metrics.commit_physical(
+        object(),
+        MetricUpdates(
+            counters={MetricKey('step1', self.name1): -3,
+                      MetricKey('step2', self.name1): -5},
+            distributions={MetricKey('step1', self.name1):
+                           DistributionData(8, 4, 1, 5),
+                           MetricKey('step2', self.name2):
+                           DistributionData(8, 8, 1, 1)}))
+    results = metrics.query()
+    hc.assert_that(results['counters'],
+                   hc.contains_inanyorder(*[
+                       MetricResult(MetricKey('step1', self.name1), 0, 4),
+                       MetricResult(MetricKey('step1', self.name2), 0, 5),
+                       MetricResult(MetricKey('step2', self.name1), 0, -4)]))
+    hc.assert_that(results['distributions'],
+                   hc.contains_inanyorder(*[
+                       MetricResult(
+                           MetricKey('step1', self.name1),
+                           DistributionResult(dist_zero),
+                           DistributionResult(DistributionData(11, 5, 1, 5))),
+                       MetricResult(
+                           MetricKey('step2', self.name3),
+                           DistributionResult(dist_zero),
+                           DistributionResult(DistributionData(8, 2, 4, 4))),
+                       MetricResult(
+                           MetricKey('step2', self.name2),
+                           DistributionResult(dist_zero),
+                           DistributionResult(DistributionData(8, 8, 1, 1)))]))
+
+    metrics.commit_logical(
+        object(),
+        MetricUpdates(
+            counters={MetricKey('step1', self.name1): 3,
+                      MetricKey('step1', self.name2): 5,
+                      MetricKey('step2', self.name1): -3},
+            distributions={MetricKey('step1', self.name1):
+                           DistributionData(11, 5, 1, 5),
+                           MetricKey('step2', self.name2):
+                           DistributionData(8, 8, 1, 1),
+                           MetricKey('step2', self.name3):
+                           DistributionData(4, 1, 4, 4)}))
+
+    results = metrics.query()
+    hc.assert_that(results['counters'],
+                   hc.contains_inanyorder(*[
+                       MetricResult(MetricKey('step1', self.name1), 3, 4),
+                       MetricResult(MetricKey('step1', self.name2), 5, 5),
+                       MetricResult(MetricKey('step2', self.name1), -3, -4)]))
+    hc.assert_that(results['distributions'],
+                   hc.contains_inanyorder(*[
+                       MetricResult(
+                           MetricKey('step1', self.name1),
+                           DistributionResult(DistributionData(11, 5, 1, 5)),
+                           DistributionResult(DistributionData(11, 5, 1, 5))),
+                       MetricResult(
+                           MetricKey('step2', self.name3),
+                           DistributionResult(DistributionData(4, 1, 4, 4)),
+                           DistributionResult(DistributionData(8, 2, 4, 4))),
+                       MetricResult(
+                           MetricKey('step2', self.name2),
+                           DistributionResult(DistributionData(8, 8, 1, 1)),
+                           DistributionResult(DistributionData(8, 8, 1, 1)))]))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 523eb05..a5c616b 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -27,6 +27,7 @@ import collections
 import logging
 
 from apache_beam.runners.direct.bundle_factory import BundleFactory
+from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
@@ -52,6 +53,7 @@ class DirectRunner(PipelineRunner):
     from apache_beam.runners.direct.transform_evaluator import \
       TransformEvaluatorRegistry
 
+    MetricsEnvironment.set_metrics_supported(True)
     logging.info('Running pipeline with DirectRunner.')
     self.visitor = ConsumerTrackingPipelineVisitor()
     pipeline.visit(self.visitor)
@@ -152,6 +154,9 @@ class DirectPipelineResult(PipelineResult):
   def aggregated_values(self, aggregator_or_name):
     return self._evaluation_context.get_aggregator_values(aggregator_or_name)
 
+  def metrics(self):
+    return self._evaluation_context.metrics()
+
 
 class EagerRunner(DirectRunner):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 48d353b..34701f5 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -26,6 +26,7 @@ from apache_beam.transforms import sideinputs
 from apache_beam.runners.direct.clock import Clock
 from apache_beam.runners.direct.watermark_manager import WatermarkManager
 from apache_beam.runners.direct.executor import TransformExecutor
+from apache_beam.runners.direct.direct_metrics import DirectMetrics
 from apache_beam.utils import counters
 
 
@@ -142,6 +143,7 @@ class EvaluationContext(object):
     self._pending_unblocked_tasks = []
     self._counter_factory = counters.CounterFactory()
     self._cache = None
+    self._metrics = DirectMetrics()
 
     self._lock = threading.Lock()
 
@@ -149,6 +151,10 @@ class EvaluationContext(object):
     assert not self._cache
     self._cache = cache
 
+  def metrics(self):
+    # TODO. Should this be made a @property?
+    return self._metrics
+
   @property
   def has_cache(self):
     return self._cache is not None
@@ -187,6 +193,9 @@ class EvaluationContext(object):
           completed_bundle, result.transform, completed_timers,
           committed_bundles, result.watermark_hold)
 
+      self._metrics.commit_logical(completed_bundle,
+                                   result.logical_metric_updates())
+
       # If the result is for a view, update side inputs container.
       if (result.output_bundles
           and result.output_bundles[0].pcollection in self.views):

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
index 378aecf..7e404f8 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -26,6 +26,9 @@ import threading
 import traceback
 from weakref import WeakValueDictionary
 
+from apache_beam.metrics.execution import MetricsContainer
+from apache_beam.metrics.execution import MetricsEnvironment
+
 
 class ExecutorService(object):
   """Thread pool for executing tasks in parallel."""
@@ -266,6 +269,8 @@ class TransformExecutor(ExecutorService.CallableTask):
   def __call__(self):
     self._call_count += 1
     assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
+    metrics_container = MetricsContainer(self._applied_transform.full_label)
+    MetricsEnvironment.set_current_container(metrics_container)
 
     for side_input in self._applied_transform.side_inputs:
       if side_input not in self._side_input_values:
@@ -290,6 +295,7 @@ class TransformExecutor(ExecutorService.CallableTask):
           evaluator.process_element(value)
 
       result = evaluator.finish_bundle()
+      result.metric_updates = metrics_container.get_cumulative()
 
       if self._evaluation_context.has_cache:
         for uncommitted_bundle in result.output_bundles:
@@ -308,6 +314,9 @@ class TransformExecutor(ExecutorService.CallableTask):
       logging.warning('Task failed: %s', traceback.format_exc(), exc_info=True)
       self._completion_callback.handle_exception(e)
     finally:
+      self._evaluation_context.metrics().commit_physical(
+          self._input_bundle,
+          metrics_container.get_cumulative())
       self._transform_evaluation_state.complete(self)
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py 
b/sdks/python/apache_beam/runners/direct/transform_result.py
index 298e629..59597dc 100644
--- a/sdks/python/apache_beam/runners/direct/transform_result.py
+++ b/sdks/python/apache_beam/runners/direct/transform_result.py
@@ -34,6 +34,10 @@ class TransformResult(object):
     self._watermark_hold = watermark_hold
     # Only used when caching (materializing) all values is requested.
     self._undeclared_tag_values = undeclared_tag_values
+    self.metric_updates = None
+
+  def logical_metric_updates(self):
+    return self.metric_updates
 
   @property
   def transform(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index ff6a22e..ea86061 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -26,6 +26,8 @@ from datetime import datetime
 import json
 import unittest
 
+import hamcrest as hc
+
 import apache_beam as beam
 
 from apache_beam.internal import apiclient
@@ -38,6 +40,12 @@ import apache_beam.transforms as ptransform
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.pipeline_options import PipelineOptions
 
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
+from apache_beam.metrics.execution import MetricResult
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.metricbase import MetricName
+
 
 class RunnerTest(unittest.TestCase):
   default_properties = [
@@ -135,6 +143,41 @@ class RunnerTest(unittest.TestCase):
     self.assertEqual(len(disp_data), 3)
     self.assertEqual(disp_data, expected_data)
 
+  def test_direct_runner_metrics(self):
+    from apache_beam.metrics.metric import Metrics
+
+    class MyDoFn(beam.DoFn):
+      def process(self, context):
+        count = Metrics.counter(self.__class__, 'elements')
+        count.inc()
+        distro = Metrics.distribution(self.__class__, 'element-dist')
+        distro.update(context.element)
+        return [context.element]
+
+    runner = DirectRunner()
+    p = Pipeline(runner,
+                 options=PipelineOptions(self.default_properties))
+    # pylint: disable=expression-not-assigned
+    (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
+     | 'do' >> beam.ParDo(MyDoFn()))
+    result = p.run()
+    metrics = result.metrics().query()
+    namespace = '{}.{}'.format(MyDoFn.__module__,
+                               MyDoFn.__name__)
+    hc.assert_that(
+        metrics['counters'],
+        hc.contains_inanyorder(
+            MetricResult(
+                MetricKey('do', MetricName(namespace, 'elements')),
+                5, 5)))
+    hc.assert_that(
+        metrics['distributions'],
+        hc.contains_inanyorder(
+            MetricResult(
+                MetricKey('do', MetricName(namespace, 'element-dist')),
+                DistributionResult(DistributionData(15, 5, 1, 5)),
+                DistributionResult(DistributionData(15, 5, 1, 5)))))
+
   def test_no_group_by_key_directly_after_bigquery(self):
     remote_runner = DataflowRunner()
     p = Pipeline(remote_runner,

Reply via email to