baeminbo opened a new issue, #36022: URL: https://github.com/apache/beam/issues/36022
### What happened? The `_log_lull_in_bundle_processor` examines the bundle processes in [active_bundle_processors.keys()](https://github.com/apache/beam/blob/v2.67.0/sdks/python/apache_beam/runners/worker/worker_status.py#L248) to report `Operation ongoing in bundle ...` for stalled work threads. The [active_bundle_processors](https://github.com/apache/beam/blob/v2.67.0/sdks/python/apache_beam/runners/worker/sdk_worker.py#L520) is updated after creating a `BundleProcessor` where `DoFn` objects are deserialized. Therefore, if the work thread is stalled at deserializing `DoFn`, the thread is not reported in the warning log `Operation ongoing ...`. --- The following is an example of the stacktrace for a work thread stalled in `DoFn` deserialization (a long sleep was put at `__setstate` to simulate this issue). The `BundleProcessor` is created at `sdk_worker.py:511`, and `active_bundle_processors` is updated at `sdk_worker.py:520`. ``` --- Thread #138116862035648 name: Thread-16 --- File "/usr/local/lib/python3.11/threading.py", line 1002, in _bootstrap self._bootstrap_inner() File "/usr/local/lib/python3.11/threading.py", line 1045, in _bootstrap_inner self.run() File "/usr/local/lib/python3.11/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run self._work_item.run() File "/usr/local/lib/python3.11/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in task self._execute( File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute response = task() File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 387, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 659, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 690, in process_bundle bundle_processor = self.bundle_processor_cache.get( File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 511, in get processor = bundle_processor.BundleProcessor( File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1133, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1190, in create_execution_tree return collections.OrderedDict([( File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1193, in get_operation(transform_id))) for transform_id in sorted( File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1178, in get_operation return transform_factory.create_operation( File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1497, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1828, in create_par_do return _create_pardo_operation( File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1872, in _create_pardo_operation dofn_data = pickler.loads(serialized_fn) File "/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py", line 57, in loads return desired_pickle_lib.loads( File "/usr/local/lib/python3.11/site-packages/apache_beam/internal/cloudpickle_pickler.py", line 176, in loads unpickled = cloudpickle.loads(s) File "/Users/baeminbo/Documents/workspace/dataflow-pipelines/py-subprocess/pipeline.py", line 30, in __setstate__ ``` ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [ ] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
