scwhittle commented on code in PR #36001:
URL: https://github.com/apache/beam/pull/36001#discussion_r2313339808
##########
sdks/python/apache_beam/runners/direct/direct_runner.py:
##########
@@ -202,9 +201,17 @@ def visit_transform(self, applied_ptransform):
# Use BundleBasedDirectRunner if other runners are missing needed features.
runner = BundleBasedDirectRunner()
+ # Check if transform overrides are needed - if so,
+ # use BundleBasedDirectRunner
+ # since Prism does not support transform overrides
+ transform_overrides = _get_transform_overrides(options)
+ if transform_overrides:
Review Comment:
For Java, instead of using overrides to implement Dataflow batch, direct
runner, etc we only override for Dataflow streaming (which specializes with
internally publishing). I think this is a better approach because then the
basic pubsub write transform works as is and overriding is just for
specialization.
See
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1624
and the override (which also has an experiment to disable):
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L659
Can we do that for Python too by:
- implementing PubsubSink in pubsub.py with something like the existing
Direct runner implementation. We may need to add some pushback if the
publisher client itself doesn't do it as otherwise we may pull in all the
messages into memory and have them pileup in publishing and oom.
- removing all the direct runner stuff
- add transform override for dataflow streaming and remove it for dataflow
batch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]