scwhittle commented on code in PR #36001: URL: https://github.com/apache/beam/pull/36001#discussion_r2313339808
########## sdks/python/apache_beam/runners/direct/direct_runner.py: ########## @@ -202,9 +201,17 @@ def visit_transform(self, applied_ptransform): # Use BundleBasedDirectRunner if other runners are missing needed features. runner = BundleBasedDirectRunner() + # Check if transform overrides are needed - if so, + # use BundleBasedDirectRunner + # since Prism does not support transform overrides + transform_overrides = _get_transform_overrides(options) + if transform_overrides: Review Comment: For Java, instead of using overrides to implement Dataflow batch, direct runner, etc we only override for Dataflow streaming (which specializes with internally publishing). I think this is a better approach because then the basic pubsub write transform works as is and overriding is just for specialization. See https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1624 and the override (which also has an experiment to disable): https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L659 Can we do that for Python too by: - implementing PubsubSink in pubsub.py with something like the existing Direct runner implementation. We may need to add some pushback if the publisher client itself doesn't do it as otherwise we may pull in all the messages into memory and have them pileup in publishing and oom. - removing all the direct runner stuff - add transform override for dataflow streaming and remove it for dataflow batch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org