Deepak Verma created BEAM-9417:
----------------------------------

             Summary: Unable to Read form BigQuery and File system in same 
pipeline
                 Key: BEAM-9417
                 URL: https://issues.apache.org/jira/browse/BEAM-9417
             Project: Beam
          Issue Type: Bug
          Components: io-py-gcp
         Environment: macbook pro cataline, python3.7, apache-beam[gcp]===2.19.0
            Reporter: Deepak Verma


I am trying to read from Bigquery and Local file system in my apache beam[gcp] 
pipeline.
{code:java}
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True

p = beam.Pipeline(options=options)

apn_query = "select * from `{bq_project}.config.apn` where 
customer='{customer}'"\
 .format(bq_project=options.bq_project, customer=options.customer)

file_path = "mycsv.csv.gz"

apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, 
use_standard_sql=True))
preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())

{code}
 

When I am running this job, I am getting below error

 
{code:java}
Traceback (most recent call last):
 File "/etl/dataflow/etlTXLPreprocessor.py", line 125, in <module>
 run()
 File "/etl/dataflow/etlTXLPreprocessor.py", line 120, in run
 p.run().wait_until_finish()
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
461, in run
 self._options).run(False)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
474, in run
 return self.runner.run_pipeline(self, self._options)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 182, in run_pipeline
 return runner.run_pipeline(pipeline, options)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 413, in run_pipeline
 pipeline.replace_all(_get_transform_overrides(options))
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
443, in replace_all
 self._replace(override)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
340, in _replace
 self.visit(TransformUpdater(self))
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
503, in visit
 self._root_transform().visit(visitor, self, visited)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
939, in visit
 part.visit(visitor, pipeline, visited)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
939, in visit
 part.visit(visitor, pipeline, visited)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
939, in visit
 part.visit(visitor, pipeline, visited)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
942, in visit
 visitor.visit_transform(self)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
338, in visit_transform
 self._replace_if_needed(transform_node)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
301, in _replace_if_needed
 new_output = replacement_transform.expand(input_node)
 File 
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py",
 line 87, in expand
 invoker = DoFnInvoker.create_invoker(signature, process_invocation=False)
 File "apache_beam/runners/common.py", line 360, in 
apache_beam.runners.common.DoFnInvoker.create_invoker
TypeError: create_invoker() takes at least 2 positional arguments (1 
given){code}
 

But If I run my code like this
{code:java}
 
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True

p = beam.Pipeline(options=options)
file_path = "mycsv.csv.gz"
preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
{code}
 

or like this
{code:java}
 
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True

p = beam.Pipeline(options=options)

apn_query = "select * from `{bq_project}.config.apn` where 
customer='{customer}'"\
 .format(bq_project=options.bq_project, customer=options.customer)


apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, 
use_standard_sql=True))
{code}
 

or even like this 
{code:java}
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True

p = beam.Pipeline(options=options)

apn_query = "select * from `{bq_project}.config.apn` where 
customer='{customer}'"\
 .format(bq_project=options.bq_project, customer=options.customer)


apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, 
use_standard_sql=True))
apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, 
use_standard_sql=True)){code}
the code just works fine.

 

Is it a limitation of the apache beam to read from the same source?

If so, can we add this feature? 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to