damccorm opened a new issue, #21103: URL: https://github.com/apache/beam/issues/21103
Please see Stack Overflow discussion: [https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir](https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir) When I create a GCS source & a Pub Source and try to flatten both, there is an error because of some incompatible transformation done by the direct runner. Code example: ``` gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \ | 'convert to dict' >> beam.Map(lambda x: json.loads(x)) liveEventsColl = p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \ | 'convert to dict2' >> beam.Map(lambda x: json.loads(x)) input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten() ``` Error: ``` File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run return self.runner.run_pipeline(self, self._options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline return runner.run_pipeline(pipeline, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 529, in run_pipeline pipeline.replace_all(_get_transform_overrides(options)) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 504, in replace_all self._check_replacement(override) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 478, in _check_replacement self.visit(ReplacementValidator()) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 611, in visit self._root_transform().visit(visitor, self, visited) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit part.visit(visitor, pipeline, visited) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit part.visit(visitor, pipeline, visited) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit part.visit(visitor, pipeline, visited) [Previous line repeated 4 more times] File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1198, in visit visitor.visit_transform(self) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 476, in visit_transform transform_node) RuntimeError: Transform node AppliedPTransform(Read from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected. ``` The direct runner corrupts the pipeline when it rewrites the transforms. Imported from Jira [BEAM-12586](https://issues.apache.org/jira/browse/BEAM-12586). Original Jira may contain additional context. Reported by: rodriguezc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
