baeminbo opened a new issue, #36022:
URL: https://github.com/apache/beam/issues/36022

   ### What happened?
   
   The `_log_lull_in_bundle_processor` examines the bundle processes in 
[active_bundle_processors.keys()](https://github.com/apache/beam/blob/v2.67.0/sdks/python/apache_beam/runners/worker/worker_status.py#L248)
 to report `Operation ongoing in bundle ...` for stalled work threads.
   
   The 
[active_bundle_processors](https://github.com/apache/beam/blob/v2.67.0/sdks/python/apache_beam/runners/worker/sdk_worker.py#L520)
 is updated after creating a `BundleProcessor` where `DoFn` objects are 
deserialized. 
   
   Therefore, if the work thread is stalled at deserializing `DoFn`, the thread 
is not reported in the warning log `Operation ongoing ...`. 
   
   --- 
   
   The following is an example of the stacktrace for a work thread stalled in 
`DoFn` deserialization (a long sleep was put at `__setstate` to simulate this 
issue).  The `BundleProcessor` is created at `sdk_worker.py:511`, and 
`active_bundle_processors` is updated at `sdk_worker.py:520`.
   
   ```
   --- Thread #138116862035648 name: Thread-16 ---
     File "/usr/local/lib/python3.11/threading.py", line 1002, in _bootstrap
       self._bootstrap_inner()
     File "/usr/local/lib/python3.11/threading.py", line 1045, in 
_bootstrap_inner
       self.run()
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/utils/thread_pool_executor.py",
 line 53, in run
       self._work_item.run()
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/utils/thread_pool_executor.py",
 line 37, in run
       self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 386, in task
       self._execute(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 313, in _execute
       response = task()
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 387, in 
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 659, in do_instruction
       return getattr(self, request_type)(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 690, in process_bundle
       bundle_processor = self.bundle_processor_cache.get(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 511, in get
       processor = bundle_processor.BundleProcessor(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1133, in __init__
       self.ops = self.create_execution_tree(self.process_bundle_descriptor)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1190, in create_execution_tree
       return collections.OrderedDict([(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1193, in 
       get_operation(transform_id))) for transform_id in sorted(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in 
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in 
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in 
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in 
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in 
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in 
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1178, in get_operation
       return transform_factory.create_operation(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1497, in create_operation
       return creator(self, transform_id, transform_proto, payload, consumers)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1828, in create_par_do
       return _create_pardo_operation(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1872, in _create_pardo_operation
       dofn_data = pickler.loads(serialized_fn)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py", line 
57, in loads
       return desired_pickle_lib.loads(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/internal/cloudpickle_pickler.py",
 line 176, in loads
       unpickled = cloudpickle.loads(s)
     File 
"/Users/baeminbo/Documents/workspace/dataflow-pipelines/py-subprocess/pipeline.py",
 line 30, in __setstate__
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to