cozos opened a new issue, #24776:
URL: https://github.com/apache/beam/issues/24776

   ### What happened?
   
   Hello, I am on Apache Beam v2.35.0 running on GCP Dataflow, and I've 
encountered what I believe are race conditions in the process reporting 
machinery (i.e. `process_bundle_progress` or `ProcessBundleProgressRequest`):
   
   ```
   Error processing instruction 
process_bundle_progress-1213241858972398550-1099. Original traceback is
   Traceback (most recent call last):
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 267, in _execute
       response = task()
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 302, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 581, in do_instruction
       getattr(request, request_type), request.instruction_id)
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 672, in process_bundle_progress
       monitoring_infos = processor.monitoring_infos()
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/bundle_processor.py",
 line 1131, in monitoring_infos
       op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
     File "apache_beam/runners/worker/operations.py", line 356, in 
apache_beam.runners.worker.operations.Operation.monitoring_infos
     File "apache_beam/runners/worker/operations.py", line 360, in 
apache_beam.runners.worker.operations.Operation.monitoring_infos
     File "apache_beam/runners/worker/operations.py", line 425, in 
apache_beam.runners.worker.operations.Operation.execution_time_monitoring_infos
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py",
 line 204, in int64_counter
       return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py",
 line 303, in create_monitoring_info
       urn=urn, type=type_urn, labels=labels or {}, payload=payload)
   SystemError: <class 'metrics_pb2.MonitoringInfo'> returned NULL without 
setting an error
   ```
   
   ```
   Error processing instruction 
process_bundle_progress-5696618351405637733-1593. Original traceback is
   Traceback (most recent call last):
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 267, in _execute
       response = task()
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 302, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 581, in do_instruction
       getattr(request, request_type), request.instruction_id)
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 672, in process_bundle_progress
       monitoring_infos = processor.monitoring_infos()
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/bundle_processor.py",
 line 1131, in monitoring_infos
       op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
     File "apache_beam/runners/worker/operations.py", line 356, in 
apache_beam.runners.worker.operations.Operation.monitoring_infos
     File "apache_beam/runners/worker/operations.py", line 360, in 
apache_beam.runners.worker.operations.Operation.monitoring_infos
     File "apache_beam/runners/worker/operations.py", line 413, in 
apache_beam.runners.worker.operations.Operation.execution_time_monitoring_infos
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py",
 line 204, in int64_counter
       return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py",
 line 303, in create_monitoring_info
       urn=urn, type=type_urn, labels=labels or {}, payload=payload)
     File 
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/_collections_abc.py",
 line 840, in update
       for key in other:
   RuntimeError: dictionary changed size during iteration
   ```
   
   ```
   Error processing instruction 
process_bundle_progress-6997054913682226470-568. Original traceback is
   Traceback (most recent call last):
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 267, in _execute
       response = task()
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 302, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 581, in do_instruction
       getattr(request, request_type), request.instruction_id)
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 683, in process_bundle_progress
       for info in monitoring_infos
     File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 683, in <dictcomp>
       for info in monitoring_infos
   AttributeError: 'bytes' object has no attribute 'payload'
   ```
   
   I am running long running C++ code through pybind11 which I think might be a 
contributing factor. However my C++ code does not access any Python objects 
without holding the GIL and definitely doesn't change anything related to 
progress reporting.
   
   I am marking this as P1 because I assume race conditions can cause data 
loss, etc - let me know if this is inappropriate.
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to