This is an automated email from the ASF dual-hosted git repository. yichi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d296688 Use daemon thread to report lull operation and move code to FnApiWorkerStatusHandler new bd0ac38 Merge pull request #15676 from y1chi/lull_logging d296688 is described below commit d2966886ae3d990056a71257d062e6067134675e Author: Yichi Zhang <zyi...@google.com> AuthorDate: Wed Oct 6 17:51:58 2021 -0700 Use daemon thread to report lull operation and move code to FnApiWorkerStatusHandler --- .../portability/fn_api_runner/fn_runner_test.py | 20 ------ .../apache_beam/runners/worker/sdk_worker.py | 76 ---------------------- .../apache_beam/runners/worker/sdk_worker_test.py | 62 ------------------ .../apache_beam/runners/worker/worker_status.py | 76 +++++++++++++++++++++- .../runners/worker/worker_status_test.py | 41 ++++++++++++ 5 files changed, 116 insertions(+), 159 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index c002963..ece4f28 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -58,13 +58,11 @@ from apache_beam.runners.portability import fn_api_runner from apache_beam.runners.portability.fn_api_runner import fn_runner from apache_beam.runners.sdf_utils import RestrictionTrackerView from apache_beam.runners.worker import data_plane -from apache_beam.runners.worker import sdk_worker from apache_beam.runners.worker import statesampler from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource from apache_beam.testing.test_stream import TestStream from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.tools import utils from apache_beam.transforms import environments from apache_beam.transforms import userstate from apache_beam.transforms import window @@ -2002,24 +2000,6 @@ class FnApiBasedLullLoggingTest(unittest.TestCase): default(), progress_request_frequency=0.5)) - def test_lull_logging(self): - - try: - utils.check_compiled('apache_beam.runners.worker.opcounters') - except RuntimeError: - self.skipTest('Cython is not available') - - with self.assertLogs(level='WARNING') as logs: - with self.create_pipeline() as p: - sdk_worker.DEFAULT_LOG_LULL_TIMEOUT_NS = 1000 * 1000 # Lull after 1 ms - - _ = (p | beam.Create([1]) | beam.Map(time.sleep)) - - self.assertRegex( - ''.join(logs.output), - '.*Operation ongoing for over.*', - 'Unable to find a lull logged for this job.') - class StateBackedTestElementType(object): live_element_count = 0 diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 6fdca4d..0951401 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -63,7 +63,6 @@ from apache_beam.runners.worker.data_plane import PeriodicThread from apache_beam.runners.worker.statecache import StateCache from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler -from apache_beam.runners.worker.worker_status import thread_dump from apache_beam.utils import thread_pool_executor from apache_beam.utils.sentinel import Sentinel @@ -77,19 +76,8 @@ _VT = TypeVar('_VT') _LOGGER = logging.getLogger(__name__) -# This SDK harness will (by default), log a "lull" in processing if it sees no -# transitions in over 5 minutes. -# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds -DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000 - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60 -# Full thread dump is performed at most every 20 minutes. -LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60 - -# Full thread dump is performed if the lull is more than 20 minutes. -LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60 - # The number of ProcessBundleRequest instruction ids the BundleProcessorCache # will remember for not running instructions. MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS = 1000 @@ -578,16 +566,11 @@ class SdkWorker(object): bundle_processor_cache, # type: BundleProcessorCache state_cache_metrics_fn=list, # type: Callable[[], Iterable[metrics_pb2.MonitoringInfo]] profiler_factory=None, # type: Optional[Callable[..., Profile]] - log_lull_timeout_ns=None, # type: Optional[int] ): # type: (...) -> None self.bundle_processor_cache = bundle_processor_cache self.state_cache_metrics_fn = state_cache_metrics_fn self.profiler_factory = profiler_factory - self.log_lull_timeout_ns = ( - log_lull_timeout_ns or DEFAULT_LOG_LULL_TIMEOUT_NS) - self._last_full_thread_dump_secs = 0.0 - self._last_lull_logged_secs = 0.0 def do_instruction(self, request): # type: (beam_fn_api_pb2.InstructionRequest) -> beam_fn_api_pb2.InstructionResponse @@ -674,64 +657,6 @@ class SdkWorker(object): instruction_id=instruction_id, process_bundle_split=process_bundle_split) - def _log_lull_in_bundle_processor(self, processor): - # type: (bundle_processor.BundleProcessor) -> None - sampler_info = processor.state_sampler.get_info() - self._log_lull_sampler_info(sampler_info) - - def _log_lull_sampler_info(self, sampler_info): - # type: (statesampler.StateSamplerInfo) -> None - if (sampler_info and sampler_info.time_since_transition and - sampler_info.time_since_transition > self.log_lull_timeout_ns and - self._passed_lull_timeout_since_last_log()): - step_name = sampler_info.state_name.step_name - state_name = sampler_info.state_name.name - lull_seconds = sampler_info.time_since_transition / 1e9 - state_lull_log = ( - 'Operation ongoing for over %.2f seconds in state %s' % - (lull_seconds, state_name)) - step_name_log = (' in step %s ' % step_name) if step_name else '' - - exec_thread = getattr(sampler_info, 'tracked_thread', None) - if exec_thread is not None: - thread_frame = sys._current_frames().get(exec_thread.ident) # pylint: disable=protected-access - stack_trace = '\n'.join( - traceback.format_stack(thread_frame)) if thread_frame else '' - else: - stack_trace = '-NOT AVAILABLE-' - - _LOGGER.warning( - '%s%s without returning. Current Traceback:\n%s', - state_lull_log, - step_name_log, - stack_trace) - - if self._should_log_full_thread_dump(lull_seconds): - self._log_full_thread_dump() - - def _passed_lull_timeout_since_last_log(self) -> bool: - if (time.time() - self._last_lull_logged_secs > - self.log_lull_timeout_ns / 1e9): - self._last_lull_logged_secs = time.time() - return True - else: - return False - - def _should_log_full_thread_dump(self, lull_seconds): - # type: (float) -> bool - if lull_seconds < LOG_LULL_FULL_THREAD_DUMP_LULL_S: - return False - now = time.time() - if (self._last_full_thread_dump_secs + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S - < now): - self._last_full_thread_dump_secs = now - return True - return False - - def _log_full_thread_dump(self): - # type: () -> None - thread_dump() - def process_bundle_progress( self, request, # type: beam_fn_api_pb2.ProcessBundleProgressRequest @@ -744,7 +669,6 @@ class SdkWorker(object): return beam_fn_api_pb2.InstructionResponse( instruction_id=instruction_id, error=traceback.format_exc()) if processor: - self._log_lull_in_bundle_processor(processor) monitoring_infos = processor.monitoring_infos() else: # Return an empty response if we aren't running. This can happen diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py index 92a2f72..d7309c1 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py @@ -21,8 +21,6 @@ import contextlib import logging -import threading -import time import unittest from collections import namedtuple @@ -40,12 +38,10 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.portability.api import metrics_pb2 from apache_beam.runners.worker import sdk_worker from apache_beam.runners.worker import statecache -from apache_beam.runners.worker import statesampler from apache_beam.runners.worker.sdk_worker import BundleProcessorCache from apache_beam.runners.worker.sdk_worker import GlobalCachingStateHandler from apache_beam.runners.worker.sdk_worker import SdkWorker from apache_beam.utils import thread_pool_executor -from apache_beam.utils.counters import CounterName _LOGGER = logging.getLogger(__name__) @@ -127,13 +123,6 @@ class SdkWorkerTest(unittest.TestCase): def test_fn_registration(self): self._check_fn_registration_multi_request((1, 4), (4, 4)) - def _get_state_sampler_info_for_lull(self, lull_duration_s): - return statesampler.StateSamplerInfo( - CounterName('progress-msecs', 'stage_name', 'step_name'), - 1, - lull_duration_s * 1e9, - threading.current_thread()) - def test_inactive_bundle_processor_returns_empty_progress_response(self): bundle_processor = mock.MagicMock() bundle_processor_cache = BundleProcessorCache(None, None, {}) @@ -283,57 +272,6 @@ class SdkWorkerTest(unittest.TestCase): hc.contains_string( 'Bundle processing associated with instruction_id has failed')) - def test_log_lull_in_bundle_processor(self): - bundle_processor_cache = mock.MagicMock() - worker = SdkWorker(bundle_processor_cache) - - now = time.time() - log_full_thread_dump_fn_name = \ - 'apache_beam.runners.worker.sdk_worker.SdkWorker._log_full_thread_dump' - with mock.patch('logging.Logger.warning') as warn_mock: - with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump: - with mock.patch('time.time') as time_mock: - time_mock.return_value = now - sampler_info = self._get_state_sampler_info_for_lull(21 * 60) - worker._log_lull_sampler_info(sampler_info) - - processing_template = warn_mock.call_args[0][1] - step_name_template = warn_mock.call_args[0][2] - traceback = warn_mock.call_args = warn_mock.call_args[0][3] - - self.assertIn('progress-msecs', processing_template) - self.assertIn('step_name', step_name_template) - self.assertIn('test_log_lull_in_bundle_processor', traceback) - - log_full_thread_dump.assert_called_once_with() - - with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump: - with mock.patch('time.time') as time_mock: - time_mock.return_value = now + 6 * 60 # 6 minutes - sampler_info = self._get_state_sampler_info_for_lull(21 * 60) - worker._log_lull_sampler_info(sampler_info) - self.assertFalse( - log_full_thread_dump.called, - 'log_full_thread_dump should not be called because only 6 minutes ' - 'have passed since the last dump.') - - with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump: - with mock.patch('time.time') as time_mock: - time_mock.return_value = now + 21 * 60 # 21 minutes - sampler_info = self._get_state_sampler_info_for_lull(10 * 60) - worker._log_lull_sampler_info(sampler_info) - self.assertFalse( - log_full_thread_dump.called, - 'log_full_thread_dump should not be called because lull is only ' - 'for 10 minutes.') - - with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump: - with mock.patch('time.time') as time_mock: - time_mock.return_value = now + 42 * 60 # 21 minutes after previous one - sampler_info = self._get_state_sampler_info_for_lull(21 * 60) - worker._log_lull_sampler_info(sampler_info) - log_full_thread_dump.assert_called_once_with() - class CachingStateHandlerTest(unittest.TestCase): def test_caching(self): diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py b/sdks/python/apache_beam/runners/worker/worker_status.py index b699f07..652b01d 100644 --- a/sdks/python/apache_beam/runners/worker/worker_status.py +++ b/sdks/python/apache_beam/runners/worker/worker_status.py @@ -17,9 +17,11 @@ """Worker status api handler for reporting SDK harness debug info.""" +import logging import queue import sys import threading +import time import traceback from collections import defaultdict @@ -36,6 +38,19 @@ try: except ImportError: hpy = None +_LOGGER = logging.getLogger(__name__) + +# This SDK harness will (by default), log a "lull" in processing if it sees no +# transitions in over 5 minutes. +# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds +DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000 + +# Full thread dump is performed at most every 20 minutes. +LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60 + +# Full thread dump is performed if the lull is more than 20 minutes. +LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60 + def thread_dump(): """Get a thread dump for the current SDK worker harness. """ @@ -120,7 +135,11 @@ DONE = Sentinel.sentinel class FnApiWorkerStatusHandler(object): """FnApiWorkerStatusHandler handles worker status request from Runner. """ def __init__( - self, status_address, bundle_process_cache=None, enable_heap_dump=False): + self, + status_address, + bundle_process_cache=None, + enable_heap_dump=False, + log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS): """Initialize FnApiWorkerStatusHandler. Args: @@ -135,11 +154,20 @@ class FnApiWorkerStatusHandler(object): self._status_stub = beam_fn_api_pb2_grpc.BeamFnWorkerStatusStub( self._status_channel) self._responses = queue.Queue() + self.log_lull_timeout_ns = log_lull_timeout_ns + self._last_full_thread_dump_secs = 0.0 + self._last_lull_logged_secs = 0.0 self._server = threading.Thread( target=lambda: self._serve(), name='fn_api_status_handler') self._server.daemon = True self._enable_heap_dump = enable_heap_dump self._server.start() + self._lull_logger = threading.Thread( + target=lambda: self._log_lull_in_bundle_processor( + self._bundle_process_cache), + name='lull_operation_logger') + self._lull_logger.daemon = True + self._lull_logger.start() def _get_responses(self): while True: @@ -177,3 +205,49 @@ class FnApiWorkerStatusHandler(object): def close(self): self._responses.put(DONE, timeout=5) + + def _log_lull_in_bundle_processor(self, bundle_process_cache): + while True: + time.sleep(2 * 60) + if bundle_process_cache.active_bundle_processors: + for instruction in list( + bundle_process_cache.active_bundle_processors.keys()): + processor = bundle_process_cache.lookup(instruction) + if processor: + info = processor.state_sampler.get_info() + self._log_lull_sampler_info(info) + + def _log_lull_sampler_info(self, sampler_info): + if not self._passed_lull_timeout_since_last_log(): + return + if (sampler_info and sampler_info.time_since_transition and + sampler_info.time_since_transition > self.log_lull_timeout_ns): + step_name = sampler_info.state_name.step_name + state_name = sampler_info.state_name.name + lull_seconds = sampler_info.time_since_transition / 1e9 + state_lull_log = ( + 'Operation ongoing for over %.2f seconds in state %s' % + (lull_seconds, state_name)) + step_name_log = (' in step %s ' % step_name) if step_name else '' + + exec_thread = getattr(sampler_info, 'tracked_thread', None) + if exec_thread is not None: + thread_frame = sys._current_frames().get(exec_thread.ident) # pylint: disable=protected-access + stack_trace = '\n'.join( + traceback.format_stack(thread_frame)) if thread_frame else '' + else: + stack_trace = '-NOT AVAILABLE-' + + _LOGGER.warning( + '%s%s without returning. Current Traceback:\n%s', + state_lull_log, + step_name_log, + stack_trace) + + def _passed_lull_timeout_since_last_log(self) -> bool: + if (time.time() - self._last_lull_logged_secs > + self.log_lull_timeout_ns / 1e9): + self._last_lull_logged_secs = time.time() + return True + else: + return False diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py b/sdks/python/apache_beam/runners/worker/worker_status_test.py index 2b4c7dc..7da5339 100644 --- a/sdks/python/apache_beam/runners/worker/worker_status_test.py +++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py @@ -17,6 +17,7 @@ import logging import threading +import time import unittest import grpc @@ -24,9 +25,11 @@ import mock from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import statesampler from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler from apache_beam.runners.worker.worker_status import heap_dump from apache_beam.utils import thread_pool_executor +from apache_beam.utils.counters import CounterName class BeamFnStatusServicer(beam_fn_api_pb2_grpc.BeamFnWorkerStatusServicer): @@ -83,6 +86,44 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase): self.assertIsNotNone(response.error) self.fn_status_handler.close() + def test_log_lull_in_bundle_processor(self): + def get_state_sampler_info_for_lull(lull_duration_s): + return statesampler.StateSamplerInfo( + CounterName('progress-msecs', 'stage_name', 'step_name'), + 1, + lull_duration_s * 1e9, + threading.current_thread()) + + now = time.time() + with mock.patch('logging.Logger.warning') as warn_mock: + with mock.patch('time.time') as time_mock: + time_mock.return_value = now + sampler_info = get_state_sampler_info_for_lull(21 * 60) + self.fn_status_handler._log_lull_sampler_info(sampler_info) + + processing_template = warn_mock.call_args[0][1] + step_name_template = warn_mock.call_args[0][2] + traceback = warn_mock.call_args = warn_mock.call_args[0][3] + + self.assertIn('progress-msecs', processing_template) + self.assertIn('step_name', step_name_template) + self.assertIn('test_log_lull_in_bundle_processor', traceback) + + with mock.patch('time.time') as time_mock: + time_mock.return_value = now + 6 * 60 # 6 minutes + sampler_info = get_state_sampler_info_for_lull(21 * 60) + self.fn_status_handler._log_lull_sampler_info(sampler_info) + + with mock.patch('time.time') as time_mock: + time_mock.return_value = now + 21 * 60 # 21 minutes + sampler_info = get_state_sampler_info_for_lull(10 * 60) + self.fn_status_handler._log_lull_sampler_info(sampler_info) + + with mock.patch('time.time') as time_mock: + time_mock.return_value = now + 42 * 60 # 21 minutes after previous one + sampler_info = get_state_sampler_info_for_lull(21 * 60) + self.fn_status_handler._log_lull_sampler_info(sampler_info) + class HeapDumpTest(unittest.TestCase): @mock.patch('apache_beam.runners.worker.worker_status.hpy', None)