[ https://issues.apache.org/jira/browse/BEAM-2732?focusedWorklogId=120984&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120984 ]
ASF GitHub Bot logged work on BEAM-2732: ---------------------------------------- Author: ASF GitHub Bot Created on: 09/Jul/18 19:46 Start Date: 09/Jul/18 19:46 Worklog Time Spent: 10m Work Description: pabloem closed pull request #5356: [BEAM-2732][BEAM-4028] Logging relies on StateSampler for context URL: https://github.com/apache/beam/pull/5356 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 5c5eba22732..4bb226492ba 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -81,9 +81,7 @@ cdef class PerWindowInvoker(DoFnInvoker): cdef class DoFnRunner(Receiver): cdef DoFnContext context - cdef LoggingContext logging_context cdef object step_name - cdef ScopedMetricsContainer scoped_metrics_container cdef list side_inputs cdef DoFnInvoker do_fn_invoker @@ -112,15 +110,5 @@ cdef class DoFnContext(object): cpdef set_element(self, WindowedValue windowed_value) -cdef class LoggingContext(object): - # TODO(robertwb): Optimize "with [cdef class]" - cpdef enter(self) - cpdef exit(self) - - -cdef class _LoggingContextAdapter(LoggingContext): - cdef object underlying - - cdef class _ReceiverAdapter(Receiver): cdef object underlying diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index d5f35de988f..88745c778e3 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -119,16 +119,6 @@ def logging_name(self): return self.user_name -class LoggingContext(object): - """For internal use only; no backwards-compatibility guarantees.""" - - def enter(self): - pass - - def exit(self): - pass - - class Receiver(object): """For internal use only; no backwards-compatibility guarantees. @@ -551,20 +541,15 @@ def __init__(self, windowing: windowing properties of the output PCollection(s) tagged_receivers: a dict of tag name to Receiver objects step_name: the name of this step - logging_context: a LoggingContext object + logging_context: DEPRECATED [BEAM-4728] state: handle for accessing DoFn state - scoped_metrics_container: Context switcher for metrics container + scoped_metrics_container: DEPRECATED operation_name: The system name assigned by the runner for this operation. """ # Need to support multiple iterations. side_inputs = list(side_inputs) - from apache_beam.metrics.execution import ScopedMetricsContainer - - self.scoped_metrics_container = ( - scoped_metrics_container or ScopedMetricsContainer()) self.step_name = step_name - self.logging_context = logging_context or LoggingContext() self.context = DoFnContext(step_name, state=state) do_fn_signature = DoFnSignature(fn) @@ -595,26 +580,16 @@ def receive(self, windowed_value): def process(self, windowed_value): try: - self.logging_context.enter() - self.scoped_metrics_container.enter() self.do_fn_invoker.invoke_process(windowed_value) except BaseException as exn: self._reraise_augmented(exn) - finally: - self.scoped_metrics_container.exit() - self.logging_context.exit() def _invoke_bundle_method(self, bundle_method): try: - self.logging_context.enter() - self.scoped_metrics_container.enter() self.context.set_element(None) bundle_method() except BaseException as exn: self._reraise_augmented(exn) - finally: - self.scoped_metrics_container.exit() - self.logging_context.exit() def start(self): self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 4193ea2debb..958731d0ce4 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -63,13 +63,12 @@ class RunnerIOOperation(operations.Operation): """Common baseclass for runner harness IO operations.""" - def __init__(self, operation_name, step_name, consumers, counter_factory, + def __init__(self, name_context, step_name, consumers, counter_factory, state_sampler, windowed_coder, target, data_channel): super(RunnerIOOperation, self).__init__( - operation_name, None, counter_factory, state_sampler) + name_context, None, counter_factory, state_sampler) self.windowed_coder = windowed_coder self.windowed_coder_impl = windowed_coder.get_impl() - self.step_name = step_name # target represents the consumer for the bytes in the data plane for a # DataInputOperation or a producer of these bytes for a DataOutputOperation. self.target = target @@ -106,9 +105,9 @@ def __init__(self, operation_name, step_name, consumers, counter_factory, windowed_coder, target=input_target, data_channel=data_channel) # We must do this manually as we don't have a spec or spec.output_coders. self.receivers = [ - operations.ConsumerSet(self.counter_factory, self.step_name, 0, - next(itervalues(consumers)), - self.windowed_coder)] + operations.ConsumerSet( + self.counter_factory, self.name_context.step_name, 0, + next(itervalues(consumers)), self.windowed_coder)] def process(self, windowed_value): self.output(windowed_value) diff --git a/sdks/python/apache_beam/runners/worker/logger.pxd b/sdks/python/apache_beam/runners/worker/logger.pxd deleted file mode 100644 index 201daf4e29a..00000000000 --- a/sdks/python/apache_beam/runners/worker/logger.pxd +++ /dev/null @@ -1,25 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -cimport cython - -from apache_beam.runners.common cimport LoggingContext - - -cdef class PerThreadLoggingContext(LoggingContext): - cdef kwargs - cdef list stack diff --git a/sdks/python/apache_beam/runners/worker/logger.py b/sdks/python/apache_beam/runners/worker/logger.py index 07cd320ff82..ae9cdd3ac75 100644 --- a/sdks/python/apache_beam/runners/worker/logger.py +++ b/sdks/python/apache_beam/runners/worker/logger.py @@ -26,7 +26,7 @@ import threading import traceback -from apache_beam.runners.common import LoggingContext +from apache_beam.runners.worker import statesampler # This module is experimental. No backwards-compatibility guarantees. @@ -38,7 +38,6 @@ class _PerThreadWorkerData(threading.local): def __init__(self): super(_PerThreadWorkerData, self).__init__() - # TODO(robertwb): Consider starting with an initial (ignored) ~20 elements # in the list, as going up and down all the way to zero incurs several # reallocations. self.stack = [] @@ -53,7 +52,7 @@ def get_data(self): per_thread_worker_data = _PerThreadWorkerData() -class PerThreadLoggingContext(LoggingContext): +class PerThreadLoggingContext(object): """A context manager to add per thread attributes.""" def __init__(self, **kwargs): @@ -150,10 +149,14 @@ def format(self, record): data = per_thread_worker_data.get_data() if 'work_item_id' in data: output['work'] = data['work_item_id'] - if 'stage_name' in data: - output['stage'] = data['stage_name'] - if 'step_name' in data: - output['step'] = data['step_name'] + + tracker = statesampler.get_current_tracker() + if tracker: + output['stage'] = tracker.stage_name + + if tracker.current_state() and tracker.current_state().name_context: + output['step'] = tracker.current_state().name_context.logging_name() + # All logging happens using the root logger. We will add the basename of the # file and the function name where the logging happened to make it easier # to identify who generated the record. diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py b/sdks/python/apache_beam/runners/worker/logger_test.py index 73ec1aa3ad9..c131775b8e0 100644 --- a/sdks/python/apache_beam/runners/worker/logger_test.py +++ b/sdks/python/apache_beam/runners/worker/logger_test.py @@ -18,6 +18,7 @@ """Tests for worker logging utilities.""" from __future__ import absolute_import +from __future__ import unicode_literals import json import logging @@ -27,6 +28,8 @@ from builtins import object from apache_beam.runners.worker import logger +from apache_beam.runners.worker import statesampler +from apache_beam.utils.counters import CounterFactory class PerThreadLoggingContextTest(unittest.TestCase): @@ -129,30 +132,38 @@ def test_record_with_arbitrary_messages(self): self.execute_multiple_cases(test_cases) def test_record_with_per_thread_info(self): - with logger.PerThreadLoggingContext( - work_item_id='workitem', stage_name='stage', step_name='step'): - formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid') - record = self.create_log_record(**self.SAMPLE_RECORD) - log_output = json.loads(formatter.format(record)) + self.maxDiff = None + tracker = statesampler.StateSampler('stage', CounterFactory()) + statesampler.set_current_tracker(tracker) + formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid') + with logger.PerThreadLoggingContext(work_item_id='workitem'): + with tracker.scoped_state('step', 'process'): + record = self.create_log_record(**self.SAMPLE_RECORD) + log_output = json.loads(formatter.format(record)) expected_output = dict(self.SAMPLE_OUTPUT) expected_output.update( {'work': 'workitem', 'stage': 'stage', 'step': 'step'}) self.assertEqual(log_output, expected_output) + statesampler.set_current_tracker(None) def test_nested_with_per_thread_info(self): + self.maxDiff = None + tracker = statesampler.StateSampler('stage', CounterFactory()) + statesampler.set_current_tracker(tracker) formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid') - with logger.PerThreadLoggingContext( - work_item_id='workitem', stage_name='stage', step_name='step1'): - record = self.create_log_record(**self.SAMPLE_RECORD) - log_output1 = json.loads(formatter.format(record)) - - with logger.PerThreadLoggingContext(step_name='step2'): + with logger.PerThreadLoggingContext(work_item_id='workitem'): + with tracker.scoped_state('step1', 'process'): record = self.create_log_record(**self.SAMPLE_RECORD) - log_output2 = json.loads(formatter.format(record)) + log_output1 = json.loads(formatter.format(record)) - record = self.create_log_record(**self.SAMPLE_RECORD) - log_output3 = json.loads(formatter.format(record)) + with tracker.scoped_state('step2', 'process'): + record = self.create_log_record(**self.SAMPLE_RECORD) + log_output2 = json.loads(formatter.format(record)) + + record = self.create_log_record(**self.SAMPLE_RECORD) + log_output3 = json.loads(formatter.format(record)) + statesampler.set_current_tracker(None) record = self.create_log_record(**self.SAMPLE_RECORD) log_output4 = json.loads(formatter.format(record)) diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py b/sdks/python/apache_beam/runners/worker/operation_specs.py index 58ba5716c0e..d64920f18fe 100644 --- a/sdks/python/apache_beam/runners/worker/operation_specs.py +++ b/sdks/python/apache_beam/runners/worker/operation_specs.py @@ -378,9 +378,9 @@ def __init__(self, operations, stage_name, step_names=None, original_names=None, name_contexts=None): + # TODO(BEAM-4028): Remove arguments other than name_contexts. self.operations = operations self.stage_name = stage_name - # TODO(BEAM-4028): Remove arguments other than name_contexts. self.name_contexts = name_contexts or self._make_name_contexts( original_names, step_names, system_names) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 143974eecf6..78a67bcd457 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -38,7 +38,6 @@ from apache_beam.runners import common from apache_beam.runners.common import Receiver from apache_beam.runners.dataflow.internal.names import PropertyNames -from apache_beam.runners.worker import logger from apache_beam.runners.worker import opcounters from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import sideinputs @@ -127,10 +126,6 @@ def __init__(self, name_context, spec, counter_factory, state_sampler): else: self.name_context = common.NameContext(name_context) - # TODO(BEAM-4028): Remove following two lines. Rely on name context. - self.operation_name = self.name_context.step_name - self.step_name = self.name_context.logging_name() - self.spec = spec self.counter_factory = counter_factory self.consumers = collections.defaultdict(list) @@ -143,14 +138,11 @@ def __init__(self, name_context, spec, counter_factory, state_sampler): self.state_sampler = state_sampler self.scoped_start_state = self.state_sampler.scoped_state( - self.name_context.metrics_name(), 'start', - metrics_container=self.metrics_container) + self.name_context, 'start', metrics_container=self.metrics_container) self.scoped_process_state = self.state_sampler.scoped_state( - self.name_context.metrics_name(), 'process', - metrics_container=self.metrics_container) + self.name_context, 'process', metrics_container=self.metrics_container) self.scoped_finish_state = self.state_sampler.scoped_state( - self.name_context.metrics_name(), 'finish', - metrics_container=self.metrics_container) + self.name_context, 'finish', metrics_container=self.metrics_container) # TODO(ccy): the '-abort' state can be added when the abort is supported in # Operations. self.receivers = [] @@ -390,11 +382,9 @@ def start(self): fn, args, kwargs, self.side_input_maps, window_fn, tagged_receivers=self.tagged_receivers, step_name=self.name_context.logging_name(), - logging_context=logger.PerThreadLoggingContext( - step_name=self.name_context.logging_name()), state=state, - scoped_metrics_container=None, operation_name=self.name_context.metrics_name()) + self.dofn_receiver = (self.dofn_runner if isinstance(self.dofn_runner, Receiver) else DoFnRunnerReceiver(self.dofn_runner)) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py index b0c2b67f9ff..b73029cf29c 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.py +++ b/sdks/python/apache_beam/runners/worker/statesampler.py @@ -22,6 +22,7 @@ import threading from collections import namedtuple +from apache_beam.runners import common from apache_beam.utils.counters import Counter from apache_beam.utils.counters import CounterName @@ -69,8 +70,18 @@ def __init__(self, prefix, counter_factory, self._states_by_name = {} self.sampling_period_ms = sampling_period_ms self.tracked_thread = None + self.finished = False + self.started = False super(StateSampler, self).__init__(sampling_period_ms) + @property + def stage_name(self): + return self._prefix + + def stop(self): + set_current_tracker(None) + super(StateSampler, self).stop() + def stop_if_still_running(self): if self.started and not self.finished: self.stop() @@ -90,13 +101,28 @@ def get_info(self): self.tracked_thread) def scoped_state(self, - step_name, + name_context, state_name, io_target=None, metrics_container=None): + """Returns a ScopedState object associated to a Step and a State. + + Args: + name_context: common.NameContext. It is the step name information. + state_name: str. It is the state name (e.g. process / start / finish). + io_target: + metrics_container: MetricsContainer. The step's metrics container. + + Returns: + A ScopedState that keeps the execution context and is able to switch it + for the execution thread. + """ + if not isinstance(name_context, common.NameContext): + name_context = common.NameContext(name_context) + counter_name = CounterName(state_name + '-msecs', stage_name=self._prefix, - step_name=step_name, + step_name=name_context.metrics_name(), io_target=io_target) if counter_name in self._states_by_name: return self._states_by_name[counter_name] @@ -105,6 +131,7 @@ def scoped_state(self, Counter.SUM) self._states_by_name[counter_name] = super( StateSampler, self)._scoped_state(counter_name, + name_context, output_counter, metrics_container) return self._states_by_name[counter_name] diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd index 76b379b7a11..799bd0d4dbf 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd @@ -43,7 +43,8 @@ cdef class StateSampler(object): cdef int32_t current_state_index - cpdef _scoped_state(self, counter_name, output_counter, metrics_container) + cpdef _scoped_state( + self, counter_name, name_context, output_counter, metrics_container) cdef class ScopedState(object): """Context manager class managing transitions for a given sampler state.""" @@ -52,6 +53,7 @@ cdef class ScopedState(object): cdef readonly int32_t state_index cdef readonly object counter cdef readonly object name + cdef readonly object name_context cdef readonly int64_t _nsecs cdef int32_t old_state_index cdef readonly MetricsContainer _metrics_container diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx index fdf496979f9..8aa5217d8d1 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx @@ -90,8 +90,12 @@ cdef class StateSampler(object): self.current_state_index = 0 self.time_since_transition = 0 self.state_transition_count = 0 - unknown_state = ScopedState( - self, CounterName('unknown'), self.current_state_index) + unknown_state = ScopedState(self, + CounterName('unknown'), + None, + self.current_state_index, + None, + None) pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) self.scoped_states_by_index = [unknown_state] pythread.PyThread_release_lock(self.lock) @@ -153,7 +157,7 @@ cdef class StateSampler(object): def current_state(self): return self.scoped_states_by_index[self.current_state_index] - cpdef _scoped_state(self, counter_name, output_counter, + cpdef _scoped_state(self, counter_name, name_context, output_counter, metrics_container): """Returns a context manager managing transitions for a given state. Args: @@ -168,6 +172,7 @@ cdef class StateSampler(object): new_state_index = len(self.scoped_states_by_index) scoped_state = ScopedState(self, counter_name, + name_context, new_state_index, output_counter, metrics_container) @@ -183,10 +188,16 @@ cdef class StateSampler(object): cdef class ScopedState(object): """Context manager class managing transitions for a given sampler state.""" - def __init__( - self, sampler, name, state_index, counter=None, metrics_container=None): + def __init__(self, + sampler, + name, + step_name_context, + state_index, + counter, + metrics_container): self.sampler = sampler self.name = name + self.name_context = step_name_context self.state_index = state_index self.counter = counter self._metrics_container = metrics_container diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py b/sdks/python/apache_beam/runners/worker/statesampler_slow.py index 2f09d0e8bb2..4b1bf830073 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py @@ -21,15 +21,18 @@ from builtins import object +from apache_beam.runners import common +from apache_beam.utils import counters + class StateSampler(object): def __init__(self, sampling_period_ms): - self._state_stack = [ScopedState(None, self, None)] + self._state_stack = [ScopedState(self, + counters.CounterName('unknown'), + None)] self.state_transition_count = 0 self.time_since_transition = 0 - self.started = False - self.finished = False def current_state(self): """Returns the current execution state. @@ -40,9 +43,12 @@ def current_state(self): def _scoped_state(self, counter_name, + name_context, output_counter, metrics_container=None): - return ScopedState(self, counter_name, output_counter, metrics_container) + assert isinstance(name_context, common.NameContext) + return ScopedState( + self, counter_name, name_context, output_counter, metrics_container) def _enter_state(self, state): self.state_transition_count += 1 @@ -57,14 +63,16 @@ def start(self): pass def stop(self): - self.finished = True + pass class ScopedState(object): - def __init__(self, sampler, name, counter=None, metrics_container=None): + def __init__(self, sampler, name, step_name_context, + counter=None, metrics_container=None): self.state_sampler = sampler self.name = name + self.name_context = step_name_context self.counter = counter self.nsecs = 0 self.metrics_container = metrics_container ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 120984) Time Spent: 19h (was: 18h 50m) > State tracking in Python is inefficient and has duplicated code > --------------------------------------------------------------- > > Key: BEAM-2732 > URL: https://issues.apache.org/jira/browse/BEAM-2732 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Pablo Estrada > Assignee: Pablo Estrada > Priority: Major > Time Spent: 19h > Remaining Estimate: 0h > > e.g logging and metrics keep state separately. State tracking should be > unified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)