This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 24e6bf8 [BEAM-2732] Starting refactor of state tracking in Python (#4375) 24e6bf8 is described below commit 24e6bf81790b97024fe1871575186d9db325bf1d Author: Pablo <pabl...@users.noreply.github.com> AuthorDate: Thu Jan 18 13:39:48 2018 -0800 [BEAM-2732] Starting refactor of state tracking in Python (#4375) Also giving the Python-only state sampler full functionality. --- .../runners/portability/fn_api_runner_test.py | 7 +- .../runners/portability/maptask_executor_runner.py | 6 +- .../apache_beam/runners/worker/bundle_processor.py | 7 +- .../apache_beam/runners/worker/statesampler.py | 81 ++++++++++++ .../runners/worker/statesampler_fake.py | 51 -------- .../{statesampler.pyx => statesampler_fast.pyx} | 136 ++++++--------------- .../runners/worker/statesampler_slow.py | 76 ++++++++++++ .../runners/worker/statesampler_test.py | 48 +++++--- sdks/python/apache_beam/utils/counters.py | 3 - 9 files changed, 225 insertions(+), 190 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 6304f71..83bb83a 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -23,13 +23,14 @@ import apache_beam as beam from apache_beam.runners.portability import fn_api_runner from apache_beam.runners.portability import maptask_executor_runner_test from apache_beam.runners.worker import sdk_worker +from apache_beam.runners.worker import statesampler from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms import window -try: - from apache_beam.runners.worker.statesampler import DEFAULT_SAMPLING_PERIOD_MS -except ImportError: +if statesampler.FAST_SAMPLER: + DEFAULT_SAMPLING_PERIOD_MS = statesampler.DEFAULT_SAMPLING_PERIOD_MS +else: DEFAULT_SAMPLING_PERIOD_MS = 0 diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py index 913ccd5..74c6b03 100644 --- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py +++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py @@ -36,15 +36,11 @@ from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import operations +from apache_beam.runners.worker import statesampler from apache_beam.typehints import typehints from apache_beam.utils import profiler from apache_beam.utils.counters import CounterFactory -try: - from apache_beam.runners.worker import statesampler -except ImportError: - from apache_beam.runners.worker import statesampler_fake as statesampler - # This module is experimental. No backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 97c318b..9bc9056 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -38,6 +38,7 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners import pipeline_context from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import operations +from apache_beam.runners.worker import statesampler from apache_beam.transforms import sideinputs from apache_beam.utils import counters from apache_beam.utils import proto_utils @@ -46,12 +47,6 @@ from apache_beam.utils import urns # This module is experimental. No backwards-compatibility guarantees. -try: - from apache_beam.runners.worker import statesampler -except ImportError: - from apache_beam.runners.worker import statesampler_fake as statesampler - - DATA_INPUT_URN = 'urn:org.apache.beam:source:runner:0.1' DATA_OUTPUT_URN = 'urn:org.apache.beam:sink:runner:0.1' IDENTITY_DOFN_URN = 'urn:org.apache.beam:dofn:identity:0.1' diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py new file mode 100644 index 0000000..03af644 --- /dev/null +++ b/sdks/python/apache_beam/runners/worker/statesampler.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This module is experimental. No backwards-compatibility guarantees. +from collections import namedtuple + +from apache_beam.utils.counters import Counter +from apache_beam.utils.counters import CounterName + +try: + from apache_beam.runners.worker import statesampler_fast as statesampler_impl + FAST_SAMPLER = True +except ImportError: + from apache_beam.runners.worker import statesampler_slow as statesampler_impl + FAST_SAMPLER = False + + +StateSamplerInfo = namedtuple( + 'StateSamplerInfo', + ['state_name', 'transition_count', 'time_since_transition']) + + +# Default period for sampling current state of pipeline execution. +DEFAULT_SAMPLING_PERIOD_MS = 200 + + +class StateSampler(statesampler_impl.StateSampler): + + def __init__(self, prefix, counter_factory, + sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS): + self.states_by_name = {} + self._prefix = prefix + self._counter_factory = counter_factory + self._states_by_name = {} + self.sampling_period_ms = sampling_period_ms + super(StateSampler, self).__init__(sampling_period_ms) + + def stop_if_still_running(self): + if self.started and not self.finished: + self.stop() + + def get_info(self): + """Returns StateSamplerInfo with transition statistics.""" + return StateSamplerInfo( + self.current_state().name, + self.state_transition_count, + self.time_since_transition) + + def scoped_state(self, step_name, state_name, io_target=None): + counter_name = CounterName(state_name + '-msecs', + stage_name=self._prefix, + step_name=step_name, + io_target=io_target) + if counter_name in self._states_by_name: + return self._states_by_name[counter_name] + else: + output_counter = self._counter_factory.get_counter(counter_name, + Counter.SUM) + self._states_by_name[counter_name] = super( + StateSampler, self)._scoped_state(counter_name, output_counter) + return self._states_by_name[counter_name] + + def commit_counters(self): + """Updates output counters with latest state statistics.""" + for state in self._states_by_name.values(): + state_msecs = int(1e-6 * state.nsecs) + state.counter.update(state_msecs - state.counter.value()) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py b/sdks/python/apache_beam/runners/worker/statesampler_fake.py deleted file mode 100644 index bc56021..0000000 --- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py +++ /dev/null @@ -1,51 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# This module is experimental. No backwards-compatibility guarantees. - - -class StateSampler(object): - - def __init__(self, *args, **kwargs): - pass - - def scoped_state(self, step_name, state_name=None, io_target=None): - return _FakeScopedState() - - def start(self): - pass - - def stop(self): - pass - - def stop_if_still_running(self): - self.stop() - - def commit_counters(self): - pass - - -class _FakeScopedState(object): - - def __enter__(self): - pass - - def __exit__(self, *unused_args): - pass - - def sampled_seconds(self): - return 0 diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx similarity index 64% rename from sdks/python/apache_beam/runners/worker/statesampler.pyx rename to sdks/python/apache_beam/runners/worker/statesampler_fast.pyx index 1e37196..d0b1878 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx @@ -34,13 +34,7 @@ thread queries the current state, the time spent since the previous sample is attributed to that state and accumulated. Over time, this allows a granular runtime profile to be produced. """ - import threading -import time - - -from apache_beam.utils.counters import Counter -from apache_beam.utils.counters import CounterName cimport cython from cpython cimport pythread @@ -71,37 +65,14 @@ cdef inline int64_t get_nsec_time() nogil: current_time.tv_nsec) -class StateSamplerInfo(object): - """Info for current state and transition statistics of StateSampler.""" - - def __init__(self, state_name, transition_count, time_since_transition): - self.state_name = state_name - self.transition_count = transition_count - self.time_since_transition = time_since_transition - - def __repr__(self): - return ('<StateSamplerInfo state: %s time: %dns transitions: %d>' - % (self.state_name, - self.time_since_transition, - self.transition_count)) - - -# Default period for sampling current state of pipeline execution. -DEFAULT_SAMPLING_PERIOD_MS = 200 - - cdef class StateSampler(object): """Tracks time spent in states during pipeline execution.""" + cdef int _sampling_period_ms - cdef object prefix - cdef object counter_factory - cdef int sampling_period_ms - - cdef dict scoped_states_by_name cdef list scoped_states_by_index - cdef bint started - cdef bint finished + cdef public bint started + cdef public bint finished cdef object sampling_thread # This lock guards members that are shared between threads, specificaly @@ -109,22 +80,16 @@ cdef class StateSampler(object): cdef pythread.PyThread_type_lock lock cdef public int64_t state_transition_count - cdef int64_t time_since_transition + cdef public int64_t time_since_transition cdef int32_t current_state_index - def __init__(self, prefix, counter_factory, - sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS): - - # TODO(pabloem): Remove this once all dashed prefixes are removed from - # the worker. - # We stop using prefixes with included dash. - self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix - self.counter_factory = counter_factory - self.sampling_period_ms = sampling_period_ms + def __init__(self, sampling_period_ms, *args): + self._sampling_period_ms = sampling_period_ms + self.started = False + self.finished = False self.lock = pythread.PyThread_allocate_lock() - self.scoped_states_by_name = {} self.current_state_index = 0 self.time_since_transition = 0 @@ -132,7 +97,6 @@ cdef class StateSampler(object): unknown_state = ScopedState(self, 'unknown', self.current_state_index) pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) self.scoped_states_by_index = [unknown_state] - self.finished = False pythread.PyThread_release_lock(self.lock) # Assert that the compiler correctly aligned the current_state field. This @@ -152,7 +116,7 @@ cdef class StateSampler(object): cdef int64_t latest_transition_count = self.state_transition_count with nogil: while True: - usleep(self.sampling_period_ms * 1000) + usleep(self._sampling_period_ms * 1000) pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) try: if self.finished: @@ -161,7 +125,7 @@ cdef class StateSampler(object): # Take an address as we can't create a reference to the scope # without the GIL. nsecs_ptr = &(<ScopedState>PyList_GET_ITEM( - self.scoped_states_by_index, self.current_state_index)).nsecs + self.scoped_states_by_index, self.current_state_index))._nsecs nsecs_ptr[0] += elapsed_nsecs if latest_transition_count != self.state_transition_count: self.time_since_transition = 0 @@ -186,64 +150,28 @@ cdef class StateSampler(object): # pythread doesn't support conditions. self.sampling_thread.join() - def stop_if_still_running(self): - if self.started and not self.finished: - self.stop() - - def get_info(self): - """Returns StateSamplerInfo with transition statistics.""" - return StateSamplerInfo( - self.scoped_states_by_index[self.current_state_index].name, - self.state_transition_count, - self.time_since_transition) + def current_state(self): + return self.scoped_states_by_index[self.current_state_index] - # TODO(pabloem): Make state_name required once all callers migrate, - # and the legacy path is removed. - def scoped_state(self, step_name, state_name=None, io_target=None): + cpdef _scoped_state(self, counter_name, output_counter): """Returns a context manager managing transitions for a given state. Args: - step_name: A string with the name of the running step. - state_name: A string with the name of the state (e.g. 'process', 'start') - io_target: An IOTargetName object describing the io_target (e.g. writing - or reading to side inputs, shuffle or state). Will often be None. + TODO(pabloem) Returns: A ScopedState for the set of step-state-io_target. """ - cdef ScopedState scoped_state - if state_name is None: - # If state_name is None, the worker is still using old style - # msec counters. - counter_name = '%s-%s-msecs' % (self.prefix, step_name) - scoped_state = self.scoped_states_by_name.get(counter_name, None) - else: - counter_name = CounterName(state_name + '-msecs', - stage_name=self.prefix, - step_name=step_name, - io_target=io_target) - scoped_state = self.scoped_states_by_name.get(counter_name, None) - - if scoped_state is None: - output_counter = self.counter_factory.get_counter(counter_name, - Counter.SUM) - new_state_index = len(self.scoped_states_by_index) - scoped_state = ScopedState(self, counter_name, - new_state_index, output_counter) - # Both scoped_states_by_index and scoped_state.nsecs are accessed - # by the sampling thread; initialize them under the lock. - pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) - self.scoped_states_by_index.append(scoped_state) - scoped_state.nsecs = 0 - pythread.PyThread_release_lock(self.lock) - self.scoped_states_by_name[counter_name] = scoped_state + new_state_index = len(self.scoped_states_by_index) + scoped_state = ScopedState(self, counter_name, + new_state_index, output_counter) + # Both scoped_states_by_index and scoped_state.nsecs are accessed + # by the sampling thread; initialize them under the lock. + pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) + self.scoped_states_by_index.append(scoped_state) + scoped_state._nsecs = 0 + pythread.PyThread_release_lock(self.lock) return scoped_state - def commit_counters(self): - """Updates output counters with latest state statistics.""" - for state in self.scoped_states_by_name.values(): - state_msecs = int(1e-6 * state.nsecs) - state.counter.update(state_msecs - state.counter.value()) - cdef class ScopedState(object): """Context manager class managing transitions for a given sampler state.""" @@ -252,7 +180,7 @@ cdef class ScopedState(object): cdef readonly int32_t state_index cdef readonly object counter cdef readonly object name - cdef readonly int64_t nsecs + cdef readonly int64_t _nsecs cdef int32_t old_state_index def __init__(self, sampler, name, state_index, counter=None): @@ -261,6 +189,16 @@ cdef class ScopedState(object): self.state_index = state_index self.counter = counter + @property + def nsecs(self): + return self._nsecs + + def sampled_seconds(self): + return 1e-9 * self.nsecs + + def __repr__(self): + return "ScopedState[%s, %s]" % (self.name, self.nsecs) + cpdef __enter__(self): self.old_state_index = self.sampler.current_state_index pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK) @@ -273,9 +211,3 @@ cdef class ScopedState(object): self.sampler.current_state_index = self.old_state_index pythread.PyThread_release_lock(self.sampler.lock) self.sampler.state_transition_count += 1 - - def __repr__(self): - return "ScopedState[%s, %s, %s]" % (self.name, self.state_index, self.nsecs) - - def sampled_seconds(self): - return 1e-9 * self.nsecs diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py b/sdks/python/apache_beam/runners/worker/statesampler_slow.py new file mode 100644 index 0000000..dafe3b4 --- /dev/null +++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This module is experimental. No backwards-compatibility guarantees. + + +class StateSampler(object): + + def __init__(self, sampling_period_ms): + self._state_stack = [ScopedState(None, self, None)] + self.state_transition_count = 0 + self.time_since_transition = 0 + self.started = False + self.finished = False + + def current_state(self): + """Returns the current execution state.""" + return self._state_stack[-1] + + def _scoped_state(self, counter_name, output_counter): + return ScopedState(self, counter_name, output_counter) + + def _enter_state(self, state): + self.state_transition_count += 1 + self._state_stack.append(state) + + def _exit_state(self): + self.state_transition_count += 1 + self._state_stack.pop() + + def start(self): + # Sampling not yet supported. Only state tracking at the moment. + self.started = True + + def stop(self): + self.finished = True + + def get_info(self): + """Returns StateSamplerInfo with transition statistics.""" + return StateSamplerInfo( + self.current_state().name, self.transition_count, 0) + + +class ScopedState(object): + + def __init__(self, sampler, name, counter=None): + self.state_sampler = sampler + self.name = name + self.counter = counter + self.nsecs = 0 + + def sampled_seconds(self): + return 1e-9 * self.nsecs + + def __repr__(self): + return "ScopedState[%s, %s]" % (self.name, self.nsecs) + + def __enter__(self): + self.state_sampler._enter_state(self) + + def __exit__(self, exc_type, exc_value, traceback): + self.state_sampler._exit_state() diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 2f2c8be..63dc6f8 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -22,22 +22,13 @@ import logging import time import unittest -from nose.plugins.skip import SkipTest - +from apache_beam.runners.worker import statesampler from apache_beam.utils.counters import CounterFactory +from apache_beam.utils.counters import CounterName class StateSamplerTest(unittest.TestCase): - def setUp(self): - try: - # pylint: disable=global-variable-not-assigned - global statesampler - from apache_beam.runners.worker import statesampler - except ImportError: - raise SkipTest('State sampler not compiled.') - super(StateSamplerTest, self).setUp() - def test_basic_sampler(self): # Set up state sampler. counter_factory = CounterFactory() @@ -46,21 +37,38 @@ class StateSamplerTest(unittest.TestCase): # Run basic workload transitioning between 3 states. sampler.start() - with sampler.scoped_state('statea'): + with sampler.scoped_state('step1', 'statea'): time.sleep(0.1) - with sampler.scoped_state('stateb'): + self.assertEqual( + sampler.current_state().name, + CounterName( + 'statea-msecs', step_name='step1', stage_name='basic')) + with sampler.scoped_state('step1', 'stateb'): time.sleep(0.2 / 2) - with sampler.scoped_state('statec'): + self.assertEqual( + sampler.current_state().name, + CounterName( + 'stateb-msecs', step_name='step1', stage_name='basic')) + with sampler.scoped_state('step1', 'statec'): time.sleep(0.3) + self.assertEqual( + sampler.current_state().name, + CounterName( + 'statec-msecs', step_name='step1', stage_name='basic')) time.sleep(0.2 / 2) + sampler.stop() sampler.commit_counters() + if not statesampler.FAST_SAMPLER: + # The slow sampler does not implement sampling, so we won't test it. + return + # Test that sampled state timings are close to their expected values. expected_counter_values = { - 'basic-statea-msecs': 100, - 'basic-stateb-msecs': 200, - 'basic-statec-msecs': 300, + CounterName('statea-msecs', step_name='step1', stage_name='basic'): 100, + CounterName('stateb-msecs', step_name='step1', stage_name='basic'): 200, + CounterName('statec-msecs', step_name='step1', stage_name='basic'): 300, } for counter in counter_factory.get_counters(): self.assertIn(counter.name, expected_counter_values) @@ -76,9 +84,9 @@ class StateSamplerTest(unittest.TestCase): sampling_period_ms=10) # Run basic workload transitioning between 3 states. - state_a = sampler.scoped_state('statea') - state_b = sampler.scoped_state('stateb') - state_c = sampler.scoped_state('statec') + state_a = sampler.scoped_state('step1', 'statea') + state_b = sampler.scoped_state('step1', 'stateb') + state_c = sampler.scoped_state('step1', 'statec') start_time = time.time() sampler.start() for _ in range(100000): diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index e2e0a1a..95b2117 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -66,9 +66,6 @@ class CounterName(_CounterName): system_name, namespace, origin, output_index, io_target) - def __str__(self): - return '%s' % self._str_internal() - def __repr__(self): return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self))) -- To stop receiving notification emails like this one, please contact ['"commits@beam.apache.org" <commits@beam.apache.org>'].