tkaymak commented on code in PR #39088:
URL: https://github.com/apache/beam/pull/39088#discussion_r3467120710
##########
sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py:
##########
@@ -214,12 +214,20 @@ def subscribe():
publisher.start()
subscriber.start()
- options = PipelineOptions([
- '--runner=PrismRunner',
- '--environment_type=LOOPBACK',
- '--streaming',
- ])
- p = TestPipeline(options=options)
+ # MqttIO read is unbounded, so this pipeline runs in streaming mode and
+ # never terminates on its own. Amend the harness-provided pipeline options
+ # rather than discarding them: enable streaming, run non-blocking so the
+ # observe-then-cancel logic below can execute, and target the Prism
portable
+ # runner. The latter is required because SwitchingDirectRunner disables its
+ # Prism delegation for pipelines containing external (cross-language)
+ # transforms (see runners/direct/direct_runner.py) and falls back to the
+ # BundleBasedDirectRunner, which cannot execute an unbounded read.
+ p = TestPipeline(blocking=False)
+ standard_options = p.get_pipeline_options().view_as(StandardOptions)
+ standard_options.streaming = True
+ standard_options.runner = 'PrismRunner'
+ p.get_pipeline_options().view_as(
+ PortableOptions).environment_type = 'LOOPBACK'
Review Comment:
Good catch — you're right, the runner is instantiated during `TestPipeline`
construction, so the post-hoc `StandardOptions.runner` assignment was a no-op
(the pipeline ran on the harness `SwitchingDirectRunner`). Fixed in fa792fed496
by passing `runner='PrismRunner'` to the constructor; the remaining harness
options are still only amended. Verified locally: the streaming read/write IT
now passes end to end on Prism.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]