ryan-mbuashundip commented on code in PR #35298:
URL: https://github.com/apache/beam/pull/35298#discussion_r2152569472


##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -167,7 +167,8 @@ def create_harness(environment, dry_run=False):
       enable_heap_dump=enable_heap_dump,
       data_sampler=data_sampler,
       deferred_exception=deferred_exception,
-      runner_capabilities=runner_capabilities)
+      runner_capabilities=runner_capabilities,
+      sdk_options=sdk_pipeline_options)

Review Comment:
   This is probably best left as a TODO for now, but now that the 
`sdk_pipeline_options` is being passed in its entirety to the `SdkHarness` it 
might be a good idea to take out the other input parameters to the constructor 
which are derived from `sdk_pipeline_options` and define them in the 
constructor body.



##########
sdks/python/apache_beam/runners/worker/worker_status.py:
##########
@@ -250,6 +263,13 @@ def _log_lull_in_bundle_processor(self, 
bundle_process_cache):
           if processor:
             info = processor.state_sampler.get_info()
             self._log_lull_sampler_info(info, instruction)
+            if self._restart_lull_timeout_ns and self._restart_lull(info):
+              sys.exit(1)

Review Comment:
   +1, the exceptions route aligns well with how the timeout was implemented in 
the Java SDK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to