This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 24e6bf8  [BEAM-2732] Starting refactor of state tracking in Python 
(#4375)
24e6bf8 is described below

commit 24e6bf81790b97024fe1871575186d9db325bf1d
Author: Pablo <pabl...@users.noreply.github.com>
AuthorDate: Thu Jan 18 13:39:48 2018 -0800

    [BEAM-2732] Starting refactor of state tracking in Python (#4375)
    
    Also giving the Python-only state sampler full functionality.
---
 .../runners/portability/fn_api_runner_test.py      |   7 +-
 .../runners/portability/maptask_executor_runner.py |   6 +-
 .../apache_beam/runners/worker/bundle_processor.py |   7 +-
 .../apache_beam/runners/worker/statesampler.py     |  81 ++++++++++++
 .../runners/worker/statesampler_fake.py            |  51 --------
 .../{statesampler.pyx => statesampler_fast.pyx}    | 136 ++++++---------------
 .../runners/worker/statesampler_slow.py            |  76 ++++++++++++
 .../runners/worker/statesampler_test.py            |  48 +++++---
 sdks/python/apache_beam/utils/counters.py          |   3 -
 9 files changed, 225 insertions(+), 190 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 6304f71..83bb83a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -23,13 +23,14 @@ import apache_beam as beam
 from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.portability import maptask_executor_runner_test
 from apache_beam.runners.worker import sdk_worker
+from apache_beam.runners.worker import statesampler
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms import window
 
-try:
-  from apache_beam.runners.worker.statesampler import 
DEFAULT_SAMPLING_PERIOD_MS
-except ImportError:
+if statesampler.FAST_SAMPLER:
+  DEFAULT_SAMPLING_PERIOD_MS = statesampler.DEFAULT_SAMPLING_PERIOD_MS
+else:
   DEFAULT_SAMPLING_PERIOD_MS = 0
 
 
diff --git 
a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py 
b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index 913ccd5..74c6b03 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -36,15 +36,11 @@ from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
+from apache_beam.runners.worker import statesampler
 from apache_beam.typehints import typehints
 from apache_beam.utils import profiler
 from apache_beam.utils.counters import CounterFactory
 
-try:
-  from apache_beam.runners.worker import statesampler
-except ImportError:
-  from apache_beam.runners.worker import statesampler_fake as statesampler
-
 # This module is experimental. No backwards-compatibility guarantees.
 
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 97c318b..9bc9056 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -38,6 +38,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import pipeline_context
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
+from apache_beam.runners.worker import statesampler
 from apache_beam.transforms import sideinputs
 from apache_beam.utils import counters
 from apache_beam.utils import proto_utils
@@ -46,12 +47,6 @@ from apache_beam.utils import urns
 # This module is experimental. No backwards-compatibility guarantees.
 
 
-try:
-  from apache_beam.runners.worker import statesampler
-except ImportError:
-  from apache_beam.runners.worker import statesampler_fake as statesampler
-
-
 DATA_INPUT_URN = 'urn:org.apache.beam:source:runner:0.1'
 DATA_OUTPUT_URN = 'urn:org.apache.beam:sink:runner:0.1'
 IDENTITY_DOFN_URN = 'urn:org.apache.beam:dofn:identity:0.1'
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py 
b/sdks/python/apache_beam/runners/worker/statesampler.py
new file mode 100644
index 0000000..03af644
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This module is experimental. No backwards-compatibility guarantees.
+from collections import namedtuple
+
+from apache_beam.utils.counters import Counter
+from apache_beam.utils.counters import CounterName
+
+try:
+  from apache_beam.runners.worker import statesampler_fast as statesampler_impl
+  FAST_SAMPLER = True
+except ImportError:
+  from apache_beam.runners.worker import statesampler_slow as statesampler_impl
+  FAST_SAMPLER = False
+
+
+StateSamplerInfo = namedtuple(
+    'StateSamplerInfo',
+    ['state_name', 'transition_count', 'time_since_transition'])
+
+
+# Default period for sampling current state of pipeline execution.
+DEFAULT_SAMPLING_PERIOD_MS = 200
+
+
+class StateSampler(statesampler_impl.StateSampler):
+
+  def __init__(self, prefix, counter_factory,
+               sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS):
+    self.states_by_name = {}
+    self._prefix = prefix
+    self._counter_factory = counter_factory
+    self._states_by_name = {}
+    self.sampling_period_ms = sampling_period_ms
+    super(StateSampler, self).__init__(sampling_period_ms)
+
+  def stop_if_still_running(self):
+    if self.started and not self.finished:
+      self.stop()
+
+  def get_info(self):
+    """Returns StateSamplerInfo with transition statistics."""
+    return StateSamplerInfo(
+        self.current_state().name,
+        self.state_transition_count,
+        self.time_since_transition)
+
+  def scoped_state(self, step_name, state_name, io_target=None):
+    counter_name = CounterName(state_name + '-msecs',
+                               stage_name=self._prefix,
+                               step_name=step_name,
+                               io_target=io_target)
+    if counter_name in self._states_by_name:
+      return self._states_by_name[counter_name]
+    else:
+      output_counter = self._counter_factory.get_counter(counter_name,
+                                                         Counter.SUM)
+      self._states_by_name[counter_name] = super(
+          StateSampler, self)._scoped_state(counter_name, output_counter)
+      return self._states_by_name[counter_name]
+
+  def commit_counters(self):
+    """Updates output counters with latest state statistics."""
+    for state in self._states_by_name.values():
+      state_msecs = int(1e-6 * state.nsecs)
+      state.counter.update(state_msecs - state.counter.value())
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py 
b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
deleted file mode 100644
index bc56021..0000000
--- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This module is experimental. No backwards-compatibility guarantees.
-
-
-class StateSampler(object):
-
-  def __init__(self, *args, **kwargs):
-    pass
-
-  def scoped_state(self, step_name, state_name=None, io_target=None):
-    return _FakeScopedState()
-
-  def start(self):
-    pass
-
-  def stop(self):
-    pass
-
-  def stop_if_still_running(self):
-    self.stop()
-
-  def commit_counters(self):
-    pass
-
-
-class _FakeScopedState(object):
-
-  def __enter__(self):
-    pass
-
-  def __exit__(self, *unused_args):
-    pass
-
-  def sampled_seconds(self):
-    return 0
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx 
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
similarity index 64%
rename from sdks/python/apache_beam/runners/worker/statesampler.pyx
rename to sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
index 1e37196..d0b1878 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
@@ -34,13 +34,7 @@ thread queries the current state, the time spent since the 
previous sample is
 attributed to that state and accumulated.  Over time, this allows a granular
 runtime profile to be produced.
 """
-
 import threading
-import time
-
-
-from apache_beam.utils.counters import Counter
-from apache_beam.utils.counters import CounterName
 
 cimport cython
 from cpython cimport pythread
@@ -71,37 +65,14 @@ cdef inline int64_t get_nsec_time() nogil:
       current_time.tv_nsec)
 
 
-class StateSamplerInfo(object):
-  """Info for current state and transition statistics of StateSampler."""
-
-  def __init__(self, state_name, transition_count, time_since_transition):
-    self.state_name = state_name
-    self.transition_count = transition_count
-    self.time_since_transition = time_since_transition
-
-  def __repr__(self):
-    return ('<StateSamplerInfo state: %s time: %dns transitions: %d>'
-            % (self.state_name,
-               self.time_since_transition,
-               self.transition_count))
-
-
-# Default period for sampling current state of pipeline execution.
-DEFAULT_SAMPLING_PERIOD_MS = 200
-
-
 cdef class StateSampler(object):
   """Tracks time spent in states during pipeline execution."""
+  cdef int _sampling_period_ms
 
-  cdef object prefix
-  cdef object counter_factory
-  cdef int sampling_period_ms
-
-  cdef dict scoped_states_by_name
   cdef list scoped_states_by_index
 
-  cdef bint started
-  cdef bint finished
+  cdef public bint started
+  cdef public bint finished
   cdef object sampling_thread
 
   # This lock guards members that are shared between threads, specificaly
@@ -109,22 +80,16 @@ cdef class StateSampler(object):
   cdef pythread.PyThread_type_lock lock
 
   cdef public int64_t state_transition_count
-  cdef int64_t time_since_transition
+  cdef public int64_t time_since_transition
 
   cdef int32_t current_state_index
 
-  def __init__(self, prefix, counter_factory,
-      sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS):
-
-    # TODO(pabloem): Remove this once all dashed prefixes are removed from
-    # the worker.
-    # We stop using prefixes with included dash.
-    self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix
-    self.counter_factory = counter_factory
-    self.sampling_period_ms = sampling_period_ms
+  def __init__(self, sampling_period_ms, *args):
+    self._sampling_period_ms = sampling_period_ms
+    self.started = False
+    self.finished = False
 
     self.lock = pythread.PyThread_allocate_lock()
-    self.scoped_states_by_name = {}
 
     self.current_state_index = 0
     self.time_since_transition = 0
@@ -132,7 +97,6 @@ cdef class StateSampler(object):
     unknown_state = ScopedState(self, 'unknown', self.current_state_index)
     pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
     self.scoped_states_by_index = [unknown_state]
-    self.finished = False
     pythread.PyThread_release_lock(self.lock)
 
     # Assert that the compiler correctly aligned the current_state field.  This
@@ -152,7 +116,7 @@ cdef class StateSampler(object):
     cdef int64_t latest_transition_count = self.state_transition_count
     with nogil:
       while True:
-        usleep(self.sampling_period_ms * 1000)
+        usleep(self._sampling_period_ms * 1000)
         pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
         try:
           if self.finished:
@@ -161,7 +125,7 @@ cdef class StateSampler(object):
           # Take an address as we can't create a reference to the scope
           # without the GIL.
           nsecs_ptr = &(<ScopedState>PyList_GET_ITEM(
-              self.scoped_states_by_index, self.current_state_index)).nsecs
+              self.scoped_states_by_index, self.current_state_index))._nsecs
           nsecs_ptr[0] += elapsed_nsecs
           if latest_transition_count != self.state_transition_count:
             self.time_since_transition = 0
@@ -186,64 +150,28 @@ cdef class StateSampler(object):
     # pythread doesn't support conditions.
     self.sampling_thread.join()
 
-  def stop_if_still_running(self):
-    if self.started and not self.finished:
-      self.stop()
-
-  def get_info(self):
-    """Returns StateSamplerInfo with transition statistics."""
-    return StateSamplerInfo(
-        self.scoped_states_by_index[self.current_state_index].name,
-        self.state_transition_count,
-        self.time_since_transition)
+  def current_state(self):
+    return self.scoped_states_by_index[self.current_state_index]
 
-  # TODO(pabloem): Make state_name required once all callers migrate,
-  #   and the legacy path is removed.
-  def scoped_state(self, step_name, state_name=None, io_target=None):
+  cpdef _scoped_state(self, counter_name, output_counter):
     """Returns a context manager managing transitions for a given state.
     Args:
-      step_name: A string with the name of the running step.
-      state_name: A string with the name of the state (e.g. 'process', 'start')
-      io_target: An IOTargetName object describing the io_target (e.g. writing
-        or reading to side inputs, shuffle or state). Will often be None.
+     TODO(pabloem)
 
     Returns:
       A ScopedState for the set of step-state-io_target.
     """
-    cdef ScopedState scoped_state
-    if state_name is None:
-      # If state_name is None, the worker is still using old style
-      # msec counters.
-      counter_name = '%s-%s-msecs' % (self.prefix, step_name)
-      scoped_state = self.scoped_states_by_name.get(counter_name, None)
-    else:
-      counter_name = CounterName(state_name + '-msecs',
-                                 stage_name=self.prefix,
-                                 step_name=step_name,
-                                 io_target=io_target)
-      scoped_state = self.scoped_states_by_name.get(counter_name, None)
-
-    if scoped_state is None:
-      output_counter = self.counter_factory.get_counter(counter_name,
-                                                        Counter.SUM)
-      new_state_index = len(self.scoped_states_by_index)
-      scoped_state = ScopedState(self, counter_name,
-                                 new_state_index, output_counter)
-      # Both scoped_states_by_index and scoped_state.nsecs are accessed
-      # by the sampling thread; initialize them under the lock.
-      pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
-      self.scoped_states_by_index.append(scoped_state)
-      scoped_state.nsecs = 0
-      pythread.PyThread_release_lock(self.lock)
-      self.scoped_states_by_name[counter_name] = scoped_state
+    new_state_index = len(self.scoped_states_by_index)
+    scoped_state = ScopedState(self, counter_name,
+                               new_state_index, output_counter)
+    # Both scoped_states_by_index and scoped_state.nsecs are accessed
+    # by the sampling thread; initialize them under the lock.
+    pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
+    self.scoped_states_by_index.append(scoped_state)
+    scoped_state._nsecs = 0
+    pythread.PyThread_release_lock(self.lock)
     return scoped_state
 
-  def commit_counters(self):
-    """Updates output counters with latest state statistics."""
-    for state in self.scoped_states_by_name.values():
-      state_msecs = int(1e-6 * state.nsecs)
-      state.counter.update(state_msecs - state.counter.value())
-
 
 cdef class ScopedState(object):
   """Context manager class managing transitions for a given sampler state."""
@@ -252,7 +180,7 @@ cdef class ScopedState(object):
   cdef readonly int32_t state_index
   cdef readonly object counter
   cdef readonly object name
-  cdef readonly int64_t nsecs
+  cdef readonly int64_t _nsecs
   cdef int32_t old_state_index
 
   def __init__(self, sampler, name, state_index, counter=None):
@@ -261,6 +189,16 @@ cdef class ScopedState(object):
     self.state_index = state_index
     self.counter = counter
 
+  @property
+  def nsecs(self):
+    return self._nsecs
+
+  def sampled_seconds(self):
+    return 1e-9 * self.nsecs
+
+  def __repr__(self):
+    return "ScopedState[%s, %s]" % (self.name, self.nsecs)
+
   cpdef __enter__(self):
     self.old_state_index = self.sampler.current_state_index
     pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK)
@@ -273,9 +211,3 @@ cdef class ScopedState(object):
     self.sampler.current_state_index = self.old_state_index
     pythread.PyThread_release_lock(self.sampler.lock)
     self.sampler.state_transition_count += 1
-
-  def __repr__(self):
-    return "ScopedState[%s, %s, %s]" % (self.name, self.state_index, 
self.nsecs)
-
-  def sampled_seconds(self):
-    return 1e-9 * self.nsecs
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py 
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
new file mode 100644
index 0000000..dafe3b4
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This module is experimental. No backwards-compatibility guarantees.
+
+
+class StateSampler(object):
+
+  def __init__(self, sampling_period_ms):
+    self._state_stack = [ScopedState(None, self, None)]
+    self.state_transition_count = 0
+    self.time_since_transition = 0
+    self.started = False
+    self.finished = False
+
+  def current_state(self):
+    """Returns the current execution state."""
+    return self._state_stack[-1]
+
+  def _scoped_state(self, counter_name, output_counter):
+    return ScopedState(self, counter_name, output_counter)
+
+  def _enter_state(self, state):
+    self.state_transition_count += 1
+    self._state_stack.append(state)
+
+  def _exit_state(self):
+    self.state_transition_count += 1
+    self._state_stack.pop()
+
+  def start(self):
+    # Sampling not yet supported. Only state tracking at the moment.
+    self.started = True
+
+  def stop(self):
+    self.finished = True
+
+  def get_info(self):
+    """Returns StateSamplerInfo with transition statistics."""
+    return StateSamplerInfo(
+        self.current_state().name, self.transition_count, 0)
+
+
+class ScopedState(object):
+
+  def __init__(self, sampler, name, counter=None):
+    self.state_sampler = sampler
+    self.name = name
+    self.counter = counter
+    self.nsecs = 0
+
+  def sampled_seconds(self):
+    return 1e-9 * self.nsecs
+
+  def __repr__(self):
+    return "ScopedState[%s, %s]" % (self.name, self.nsecs)
+
+  def __enter__(self):
+    self.state_sampler._enter_state(self)
+
+  def __exit__(self, exc_type, exc_value, traceback):
+    self.state_sampler._exit_state()
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py 
b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index 2f2c8be..63dc6f8 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -22,22 +22,13 @@ import logging
 import time
 import unittest
 
-from nose.plugins.skip import SkipTest
-
+from apache_beam.runners.worker import statesampler
 from apache_beam.utils.counters import CounterFactory
+from apache_beam.utils.counters import CounterName
 
 
 class StateSamplerTest(unittest.TestCase):
 
-  def setUp(self):
-    try:
-      # pylint: disable=global-variable-not-assigned
-      global statesampler
-      from apache_beam.runners.worker import statesampler
-    except ImportError:
-      raise SkipTest('State sampler not compiled.')
-    super(StateSamplerTest, self).setUp()
-
   def test_basic_sampler(self):
     # Set up state sampler.
     counter_factory = CounterFactory()
@@ -46,21 +37,38 @@ class StateSamplerTest(unittest.TestCase):
 
     # Run basic workload transitioning between 3 states.
     sampler.start()
-    with sampler.scoped_state('statea'):
+    with sampler.scoped_state('step1', 'statea'):
       time.sleep(0.1)
-      with sampler.scoped_state('stateb'):
+      self.assertEqual(
+          sampler.current_state().name,
+          CounterName(
+              'statea-msecs', step_name='step1', stage_name='basic'))
+      with sampler.scoped_state('step1', 'stateb'):
         time.sleep(0.2 / 2)
-        with sampler.scoped_state('statec'):
+        self.assertEqual(
+            sampler.current_state().name,
+            CounterName(
+                'stateb-msecs', step_name='step1', stage_name='basic'))
+        with sampler.scoped_state('step1', 'statec'):
           time.sleep(0.3)
+          self.assertEqual(
+              sampler.current_state().name,
+              CounterName(
+                  'statec-msecs', step_name='step1', stage_name='basic'))
         time.sleep(0.2 / 2)
+
     sampler.stop()
     sampler.commit_counters()
 
+    if not statesampler.FAST_SAMPLER:
+      # The slow sampler does not implement sampling, so we won't test it.
+      return
+
     # Test that sampled state timings are close to their expected values.
     expected_counter_values = {
-        'basic-statea-msecs': 100,
-        'basic-stateb-msecs': 200,
-        'basic-statec-msecs': 300,
+        CounterName('statea-msecs', step_name='step1', stage_name='basic'): 
100,
+        CounterName('stateb-msecs', step_name='step1', stage_name='basic'): 
200,
+        CounterName('statec-msecs', step_name='step1', stage_name='basic'): 
300,
     }
     for counter in counter_factory.get_counters():
       self.assertIn(counter.name, expected_counter_values)
@@ -76,9 +84,9 @@ class StateSamplerTest(unittest.TestCase):
                                         sampling_period_ms=10)
 
     # Run basic workload transitioning between 3 states.
-    state_a = sampler.scoped_state('statea')
-    state_b = sampler.scoped_state('stateb')
-    state_c = sampler.scoped_state('statec')
+    state_a = sampler.scoped_state('step1', 'statea')
+    state_b = sampler.scoped_state('step1', 'stateb')
+    state_c = sampler.scoped_state('step1', 'statec')
     start_time = time.time()
     sampler.start()
     for _ in range(100000):
diff --git a/sdks/python/apache_beam/utils/counters.py 
b/sdks/python/apache_beam/utils/counters.py
index e2e0a1a..95b2117 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -66,9 +66,6 @@ class CounterName(_CounterName):
                                            system_name, namespace,
                                            origin, output_index, io_target)
 
-  def __str__(self):
-    return '%s' % self._str_internal()
-
   def __repr__(self):
     return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self)))
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <commits@beam.apache.org>'].

Reply via email to