[ 
https://issues.apache.org/jira/browse/BEAM-2732?focusedWorklogId=120984&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120984
 ]

ASF GitHub Bot logged work on BEAM-2732:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jul/18 19:46
            Start Date: 09/Jul/18 19:46
    Worklog Time Spent: 10m 
      Work Description: pabloem closed pull request #5356: 
[BEAM-2732][BEAM-4028] Logging relies on StateSampler for context
URL: https://github.com/apache/beam/pull/5356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 5c5eba22732..4bb226492ba 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -81,9 +81,7 @@ cdef class PerWindowInvoker(DoFnInvoker):
 
 cdef class DoFnRunner(Receiver):
   cdef DoFnContext context
-  cdef LoggingContext logging_context
   cdef object step_name
-  cdef ScopedMetricsContainer scoped_metrics_container
   cdef list side_inputs
   cdef DoFnInvoker do_fn_invoker
 
@@ -112,15 +110,5 @@ cdef class DoFnContext(object):
   cpdef set_element(self, WindowedValue windowed_value)
 
 
-cdef class LoggingContext(object):
-  # TODO(robertwb): Optimize "with [cdef class]"
-  cpdef enter(self)
-  cpdef exit(self)
-
-
-cdef class _LoggingContextAdapter(LoggingContext):
-  cdef object underlying
-
-
 cdef class _ReceiverAdapter(Receiver):
   cdef object underlying
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index d5f35de988f..88745c778e3 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -119,16 +119,6 @@ def logging_name(self):
     return self.user_name
 
 
-class LoggingContext(object):
-  """For internal use only; no backwards-compatibility guarantees."""
-
-  def enter(self):
-    pass
-
-  def exit(self):
-    pass
-
-
 class Receiver(object):
   """For internal use only; no backwards-compatibility guarantees.
 
@@ -551,20 +541,15 @@ def __init__(self,
       windowing: windowing properties of the output PCollection(s)
       tagged_receivers: a dict of tag name to Receiver objects
       step_name: the name of this step
-      logging_context: a LoggingContext object
+      logging_context: DEPRECATED [BEAM-4728]
       state: handle for accessing DoFn state
-      scoped_metrics_container: Context switcher for metrics container
+      scoped_metrics_container: DEPRECATED
       operation_name: The system name assigned by the runner for this 
operation.
     """
     # Need to support multiple iterations.
     side_inputs = list(side_inputs)
 
-    from apache_beam.metrics.execution import ScopedMetricsContainer
-
-    self.scoped_metrics_container = (
-        scoped_metrics_container or ScopedMetricsContainer())
     self.step_name = step_name
-    self.logging_context = logging_context or LoggingContext()
     self.context = DoFnContext(step_name, state=state)
 
     do_fn_signature = DoFnSignature(fn)
@@ -595,26 +580,16 @@ def receive(self, windowed_value):
 
   def process(self, windowed_value):
     try:
-      self.logging_context.enter()
-      self.scoped_metrics_container.enter()
       self.do_fn_invoker.invoke_process(windowed_value)
     except BaseException as exn:
       self._reraise_augmented(exn)
-    finally:
-      self.scoped_metrics_container.exit()
-      self.logging_context.exit()
 
   def _invoke_bundle_method(self, bundle_method):
     try:
-      self.logging_context.enter()
-      self.scoped_metrics_container.enter()
       self.context.set_element(None)
       bundle_method()
     except BaseException as exn:
       self._reraise_augmented(exn)
-    finally:
-      self.scoped_metrics_container.exit()
-      self.logging_context.exit()
 
   def start(self):
     self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 4193ea2debb..958731d0ce4 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -63,13 +63,12 @@
 class RunnerIOOperation(operations.Operation):
   """Common baseclass for runner harness IO operations."""
 
-  def __init__(self, operation_name, step_name, consumers, counter_factory,
+  def __init__(self, name_context, step_name, consumers, counter_factory,
                state_sampler, windowed_coder, target, data_channel):
     super(RunnerIOOperation, self).__init__(
-        operation_name, None, counter_factory, state_sampler)
+        name_context, None, counter_factory, state_sampler)
     self.windowed_coder = windowed_coder
     self.windowed_coder_impl = windowed_coder.get_impl()
-    self.step_name = step_name
     # target represents the consumer for the bytes in the data plane for a
     # DataInputOperation or a producer of these bytes for a 
DataOutputOperation.
     self.target = target
@@ -106,9 +105,9 @@ def __init__(self, operation_name, step_name, consumers, 
counter_factory,
         windowed_coder, target=input_target, data_channel=data_channel)
     # We must do this manually as we don't have a spec or spec.output_coders.
     self.receivers = [
-        operations.ConsumerSet(self.counter_factory, self.step_name, 0,
-                               next(itervalues(consumers)),
-                               self.windowed_coder)]
+        operations.ConsumerSet(
+            self.counter_factory, self.name_context.step_name, 0,
+            next(itervalues(consumers)), self.windowed_coder)]
 
   def process(self, windowed_value):
     self.output(windowed_value)
diff --git a/sdks/python/apache_beam/runners/worker/logger.pxd 
b/sdks/python/apache_beam/runners/worker/logger.pxd
deleted file mode 100644
index 201daf4e29a..00000000000
--- a/sdks/python/apache_beam/runners/worker/logger.pxd
+++ /dev/null
@@ -1,25 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-cimport cython
-
-from apache_beam.runners.common cimport LoggingContext
-
-
-cdef class PerThreadLoggingContext(LoggingContext):
-  cdef kwargs
-  cdef list stack
diff --git a/sdks/python/apache_beam/runners/worker/logger.py 
b/sdks/python/apache_beam/runners/worker/logger.py
index 07cd320ff82..ae9cdd3ac75 100644
--- a/sdks/python/apache_beam/runners/worker/logger.py
+++ b/sdks/python/apache_beam/runners/worker/logger.py
@@ -26,7 +26,7 @@
 import threading
 import traceback
 
-from apache_beam.runners.common import LoggingContext
+from apache_beam.runners.worker import statesampler
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -38,7 +38,6 @@ class _PerThreadWorkerData(threading.local):
 
   def __init__(self):
     super(_PerThreadWorkerData, self).__init__()
-    # TODO(robertwb): Consider starting with an initial (ignored) ~20 elements
     # in the list, as going up and down all the way to zero incurs several
     # reallocations.
     self.stack = []
@@ -53,7 +52,7 @@ def get_data(self):
 per_thread_worker_data = _PerThreadWorkerData()
 
 
-class PerThreadLoggingContext(LoggingContext):
+class PerThreadLoggingContext(object):
   """A context manager to add per thread attributes."""
 
   def __init__(self, **kwargs):
@@ -150,10 +149,14 @@ def format(self, record):
     data = per_thread_worker_data.get_data()
     if 'work_item_id' in data:
       output['work'] = data['work_item_id']
-    if 'stage_name' in data:
-      output['stage'] = data['stage_name']
-    if 'step_name' in data:
-      output['step'] = data['step_name']
+
+    tracker = statesampler.get_current_tracker()
+    if tracker:
+      output['stage'] = tracker.stage_name
+
+      if tracker.current_state() and tracker.current_state().name_context:
+        output['step'] = tracker.current_state().name_context.logging_name()
+
     # All logging happens using the root logger. We will add the basename of 
the
     # file and the function name where the logging happened to make it easier
     # to identify who generated the record.
diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py 
b/sdks/python/apache_beam/runners/worker/logger_test.py
index 73ec1aa3ad9..c131775b8e0 100644
--- a/sdks/python/apache_beam/runners/worker/logger_test.py
+++ b/sdks/python/apache_beam/runners/worker/logger_test.py
@@ -18,6 +18,7 @@
 """Tests for worker logging utilities."""
 
 from __future__ import absolute_import
+from __future__ import unicode_literals
 
 import json
 import logging
@@ -27,6 +28,8 @@
 from builtins import object
 
 from apache_beam.runners.worker import logger
+from apache_beam.runners.worker import statesampler
+from apache_beam.utils.counters import CounterFactory
 
 
 class PerThreadLoggingContextTest(unittest.TestCase):
@@ -129,30 +132,38 @@ def test_record_with_arbitrary_messages(self):
     self.execute_multiple_cases(test_cases)
 
   def test_record_with_per_thread_info(self):
-    with logger.PerThreadLoggingContext(
-        work_item_id='workitem', stage_name='stage', step_name='step'):
-      formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
-      record = self.create_log_record(**self.SAMPLE_RECORD)
-      log_output = json.loads(formatter.format(record))
+    self.maxDiff = None
+    tracker = statesampler.StateSampler('stage', CounterFactory())
+    statesampler.set_current_tracker(tracker)
+    formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
+    with logger.PerThreadLoggingContext(work_item_id='workitem'):
+      with tracker.scoped_state('step', 'process'):
+        record = self.create_log_record(**self.SAMPLE_RECORD)
+        log_output = json.loads(formatter.format(record))
     expected_output = dict(self.SAMPLE_OUTPUT)
     expected_output.update(
         {'work': 'workitem', 'stage': 'stage', 'step': 'step'})
     self.assertEqual(log_output, expected_output)
+    statesampler.set_current_tracker(None)
 
   def test_nested_with_per_thread_info(self):
+    self.maxDiff = None
+    tracker = statesampler.StateSampler('stage', CounterFactory())
+    statesampler.set_current_tracker(tracker)
     formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
-    with logger.PerThreadLoggingContext(
-        work_item_id='workitem', stage_name='stage', step_name='step1'):
-      record = self.create_log_record(**self.SAMPLE_RECORD)
-      log_output1 = json.loads(formatter.format(record))
-
-      with logger.PerThreadLoggingContext(step_name='step2'):
+    with logger.PerThreadLoggingContext(work_item_id='workitem'):
+      with tracker.scoped_state('step1', 'process'):
         record = self.create_log_record(**self.SAMPLE_RECORD)
-        log_output2 = json.loads(formatter.format(record))
+        log_output1 = json.loads(formatter.format(record))
 
-      record = self.create_log_record(**self.SAMPLE_RECORD)
-      log_output3 = json.loads(formatter.format(record))
+        with tracker.scoped_state('step2', 'process'):
+          record = self.create_log_record(**self.SAMPLE_RECORD)
+          log_output2 = json.loads(formatter.format(record))
+
+        record = self.create_log_record(**self.SAMPLE_RECORD)
+        log_output3 = json.loads(formatter.format(record))
 
+    statesampler.set_current_tracker(None)
     record = self.create_log_record(**self.SAMPLE_RECORD)
     log_output4 = json.loads(formatter.format(record))
 
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py 
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index 58ba5716c0e..d64920f18fe 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -378,9 +378,9 @@ def __init__(self, operations, stage_name,
                step_names=None,
                original_names=None,
                name_contexts=None):
+    # TODO(BEAM-4028): Remove arguments other than name_contexts.
     self.operations = operations
     self.stage_name = stage_name
-    # TODO(BEAM-4028): Remove arguments other than name_contexts.
     self.name_contexts = name_contexts or self._make_name_contexts(
         original_names, step_names, system_names)
 
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 143974eecf6..78a67bcd457 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -38,7 +38,6 @@
 from apache_beam.runners import common
 from apache_beam.runners.common import Receiver
 from apache_beam.runners.dataflow.internal.names import PropertyNames
-from apache_beam.runners.worker import logger
 from apache_beam.runners.worker import opcounters
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import sideinputs
@@ -127,10 +126,6 @@ def __init__(self, name_context, spec, counter_factory, 
state_sampler):
     else:
       self.name_context = common.NameContext(name_context)
 
-    # TODO(BEAM-4028): Remove following two lines. Rely on name context.
-    self.operation_name = self.name_context.step_name
-    self.step_name = self.name_context.logging_name()
-
     self.spec = spec
     self.counter_factory = counter_factory
     self.consumers = collections.defaultdict(list)
@@ -143,14 +138,11 @@ def __init__(self, name_context, spec, counter_factory, 
state_sampler):
 
     self.state_sampler = state_sampler
     self.scoped_start_state = self.state_sampler.scoped_state(
-        self.name_context.metrics_name(), 'start',
-        metrics_container=self.metrics_container)
+        self.name_context, 'start', metrics_container=self.metrics_container)
     self.scoped_process_state = self.state_sampler.scoped_state(
-        self.name_context.metrics_name(), 'process',
-        metrics_container=self.metrics_container)
+        self.name_context, 'process', metrics_container=self.metrics_container)
     self.scoped_finish_state = self.state_sampler.scoped_state(
-        self.name_context.metrics_name(), 'finish',
-        metrics_container=self.metrics_container)
+        self.name_context, 'finish', metrics_container=self.metrics_container)
     # TODO(ccy): the '-abort' state can be added when the abort is supported in
     # Operations.
     self.receivers = []
@@ -390,11 +382,9 @@ def start(self):
           fn, args, kwargs, self.side_input_maps, window_fn,
           tagged_receivers=self.tagged_receivers,
           step_name=self.name_context.logging_name(),
-          logging_context=logger.PerThreadLoggingContext(
-              step_name=self.name_context.logging_name()),
           state=state,
-          scoped_metrics_container=None,
           operation_name=self.name_context.metrics_name())
+
       self.dofn_receiver = (self.dofn_runner
                             if isinstance(self.dofn_runner, Receiver)
                             else DoFnRunnerReceiver(self.dofn_runner))
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py 
b/sdks/python/apache_beam/runners/worker/statesampler.py
index b0c2b67f9ff..b73029cf29c 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -22,6 +22,7 @@
 import threading
 from collections import namedtuple
 
+from apache_beam.runners import common
 from apache_beam.utils.counters import Counter
 from apache_beam.utils.counters import CounterName
 
@@ -69,8 +70,18 @@ def __init__(self, prefix, counter_factory,
     self._states_by_name = {}
     self.sampling_period_ms = sampling_period_ms
     self.tracked_thread = None
+    self.finished = False
+    self.started = False
     super(StateSampler, self).__init__(sampling_period_ms)
 
+  @property
+  def stage_name(self):
+    return self._prefix
+
+  def stop(self):
+    set_current_tracker(None)
+    super(StateSampler, self).stop()
+
   def stop_if_still_running(self):
     if self.started and not self.finished:
       self.stop()
@@ -90,13 +101,28 @@ def get_info(self):
         self.tracked_thread)
 
   def scoped_state(self,
-                   step_name,
+                   name_context,
                    state_name,
                    io_target=None,
                    metrics_container=None):
+    """Returns a ScopedState object associated to a Step and a State.
+
+    Args:
+      name_context: common.NameContext. It is the step name information.
+      state_name: str. It is the state name (e.g. process / start / finish).
+      io_target:
+      metrics_container: MetricsContainer. The step's metrics container.
+
+    Returns:
+      A ScopedState that keeps the execution context and is able to switch it
+      for the execution thread.
+    """
+    if not isinstance(name_context, common.NameContext):
+      name_context = common.NameContext(name_context)
+
     counter_name = CounterName(state_name + '-msecs',
                                stage_name=self._prefix,
-                               step_name=step_name,
+                               step_name=name_context.metrics_name(),
                                io_target=io_target)
     if counter_name in self._states_by_name:
       return self._states_by_name[counter_name]
@@ -105,6 +131,7 @@ def scoped_state(self,
                                                          Counter.SUM)
       self._states_by_name[counter_name] = super(
           StateSampler, self)._scoped_state(counter_name,
+                                            name_context,
                                             output_counter,
                                             metrics_container)
       return self._states_by_name[counter_name]
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd 
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
index 76b379b7a11..799bd0d4dbf 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
@@ -43,7 +43,8 @@ cdef class StateSampler(object):
 
   cdef int32_t current_state_index
 
-  cpdef _scoped_state(self, counter_name, output_counter, metrics_container)
+  cpdef _scoped_state(
+      self, counter_name, name_context, output_counter, metrics_container)
 
 cdef class ScopedState(object):
   """Context manager class managing transitions for a given sampler state."""
@@ -52,6 +53,7 @@ cdef class ScopedState(object):
   cdef readonly int32_t state_index
   cdef readonly object counter
   cdef readonly object name
+  cdef readonly object name_context
   cdef readonly int64_t _nsecs
   cdef int32_t old_state_index
   cdef readonly MetricsContainer _metrics_container
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx 
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
index fdf496979f9..8aa5217d8d1 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
@@ -90,8 +90,12 @@ cdef class StateSampler(object):
     self.current_state_index = 0
     self.time_since_transition = 0
     self.state_transition_count = 0
-    unknown_state = ScopedState(
-        self, CounterName('unknown'), self.current_state_index)
+    unknown_state = ScopedState(self,
+                                CounterName('unknown'),
+                                None,
+                                self.current_state_index,
+                                None,
+                                None)
     pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
     self.scoped_states_by_index = [unknown_state]
     pythread.PyThread_release_lock(self.lock)
@@ -153,7 +157,7 @@ cdef class StateSampler(object):
   def current_state(self):
     return self.scoped_states_by_index[self.current_state_index]
 
-  cpdef _scoped_state(self, counter_name, output_counter,
+  cpdef _scoped_state(self, counter_name, name_context, output_counter,
                       metrics_container):
     """Returns a context manager managing transitions for a given state.
     Args:
@@ -168,6 +172,7 @@ cdef class StateSampler(object):
     new_state_index = len(self.scoped_states_by_index)
     scoped_state = ScopedState(self,
                                counter_name,
+                               name_context,
                                new_state_index,
                                output_counter,
                                metrics_container)
@@ -183,10 +188,16 @@ cdef class StateSampler(object):
 cdef class ScopedState(object):
   """Context manager class managing transitions for a given sampler state."""
 
-  def __init__(
-      self, sampler, name, state_index, counter=None, metrics_container=None):
+  def __init__(self,
+               sampler,
+               name,
+               step_name_context,
+               state_index,
+               counter,
+               metrics_container):
     self.sampler = sampler
     self.name = name
+    self.name_context = step_name_context
     self.state_index = state_index
     self.counter = counter
     self._metrics_container = metrics_container
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py 
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
index 2f09d0e8bb2..4b1bf830073 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -21,15 +21,18 @@
 
 from builtins import object
 
+from apache_beam.runners import common
+from apache_beam.utils import counters
+
 
 class StateSampler(object):
 
   def __init__(self, sampling_period_ms):
-    self._state_stack = [ScopedState(None, self, None)]
+    self._state_stack = [ScopedState(self,
+                                     counters.CounterName('unknown'),
+                                     None)]
     self.state_transition_count = 0
     self.time_since_transition = 0
-    self.started = False
-    self.finished = False
 
   def current_state(self):
     """Returns the current execution state.
@@ -40,9 +43,12 @@ def current_state(self):
 
   def _scoped_state(self,
                     counter_name,
+                    name_context,
                     output_counter,
                     metrics_container=None):
-    return ScopedState(self, counter_name, output_counter, metrics_container)
+    assert isinstance(name_context, common.NameContext)
+    return ScopedState(
+        self, counter_name, name_context, output_counter, metrics_container)
 
   def _enter_state(self, state):
     self.state_transition_count += 1
@@ -57,14 +63,16 @@ def start(self):
     pass
 
   def stop(self):
-    self.finished = True
+    pass
 
 
 class ScopedState(object):
 
-  def __init__(self, sampler, name, counter=None, metrics_container=None):
+  def __init__(self, sampler, name, step_name_context,
+               counter=None, metrics_container=None):
     self.state_sampler = sampler
     self.name = name
+    self.name_context = step_name_context
     self.counter = counter
     self.nsecs = 0
     self.metrics_container = metrics_container


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 120984)
    Time Spent: 19h  (was: 18h 50m)

> State tracking in Python is inefficient and has duplicated code
> ---------------------------------------------------------------
>
>                 Key: BEAM-2732
>                 URL: https://issues.apache.org/jira/browse/BEAM-2732
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 19h
>  Remaining Estimate: 0h
>
> e.g logging and metrics keep state separately. State tracking should be 
> unified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to