damccorm opened a new issue, #21103:
URL: https://github.com/apache/beam/issues/21103

   Please see Stack Overflow discussion:
   
   
[https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir](https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir)
   
   When I create a GCS source & a Pub Source and try to flatten both, there is 
an error because of some incompatible transformation done by the direct runner.
   
   Code example:
   ```
   
   gcsEventsColl = p | "Read from GCS" >> 
beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \
   
                    | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
   liveEventsColl = p | "Read
   from Pubsub" >> 
beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
                 
       | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))
   
   
   input_rec = (gcsEventsColl, liveEventsColl)
   | 'flatten' >> beam.Flatten()
   
   ```
   
   Error:
   ```
   
   File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 564, in run
       return self.runner.run_pipeline(self, self._options)
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
   line 131, in run_pipeline
       return runner.run_pipeline(pipeline, options)
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
   line 529, in run_pipeline
       pipeline.replace_all(_get_transform_overrides(options))
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 504, in replace_all
       self._check_replacement(override)
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 478, in _check_replacement
       self.visit(ReplacementValidator())
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 611, in visit
       self._root_transform().visit(visitor, self, visited)
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 1195, in visit
       part.visit(visitor, pipeline, visited)
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 1195, in visit
       part.visit(visitor, pipeline, visited)
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 1195, in visit
       part.visit(visitor, pipeline, visited)   [Previous line repeated 4 more 
times]
   
     File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 1198, in visit
       visitor.visit_transform(self)
      File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 476, in visit_transform
       transform_node) RuntimeError: Transform node AppliedPTransform(Read
   from 
GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
   
      _GroupByKeyOnly) was not replaced as expected.
   
   ```
   
   The direct runner corrupts the pipeline when it rewrites the transforms.
   
    
   
   Imported from Jira 
[BEAM-12586](https://issues.apache.org/jira/browse/BEAM-12586). Original Jira 
may contain additional context.
   Reported by: rodriguezc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to