Hi All,
I was able to make it work by creating the PCollection with the numpy
array. However, writing to BQ was impossible because it requested for the
schema.
The code:
(p | "create all" >> beam.Create(expression[1:5,1:5])
| "write all text" >> beam.io.WriteToText('gs://archs4/output/',
file_name_suffix='.txt'))
*Is there a walk around for providing schema for beam.io
<http://beam.io>.BigQuerySink?*
Many thanks,
Eila
On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof <
[email protected]> wrote:
> Hello all,
>
> I would like to try a different way to leverage Apache beam for H5 => BQ
> (file to table transfer).
>
> For my use case, I would like to read every 10K rows of H5 data (numpy
> array format), transpose them and write them to BQ 10K columns. 10K is BQ
> columns limit.
>
> My code is below and fires the following error (I might have missed
> something basic). I am not using beam.Create and trying to create a
> PCollection from the ParDo transfer. is this posssible? if not, what is the
> alternative for creating a PColleciton from numpy array? (if any)
>
> ERROR:root:Exception at bundle
> <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f00aad7b7a0>,
> due to an exception.
> Traceback (most recent call last):
> File
> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
> line 307, in call
> side_input_values)
> File
> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
> line 332, in attempt_call
> evaluator.start_bundle()
> File
> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
> line 540, in start_bundle
> self._applied_ptransform.inputs[0].windowing,
> AttributeError: 'PBegin' object has no attribute 'windowing'
>
> ERROR:root:Giving up after 4 attempts.
> WARNING:root:A task failed with exception: 'PBegin' object has no attribute
> 'windowing'
> WARNING:root:A task failed with exception: 'PBegin' object has no attribute
> 'windowing'
>
>
>
> *Code:*
>
> options = PipelineOptions()
> google_cloud_options = options.view_as(GoogleCloudOptions)
> google_cloud_options.project = 'orielresearch-188115'
> google_cloud_options.job_name = 'h5-to-bq-10K'
> google_cloud_options.staging_location = 'gs://archs4/staging'
> google_cloud_options.temp_location = 'gs://archs4/temp'
> options.view_as(StandardOptions).runner = 'DirectRunner'
>
> p = beam.Pipeline(options=options)
>
> class read10kRowsDoFn(beam.DoFn):
> def process(self, element,index):
> print(index)
> row_start = index
> row_end = index+10000
> # returns numpy array - numpy.ndarray
> d = expression[row_start,row_end,:]
> np.transpose(d)
> return(d)
>
> #for i in range(0,expression.shape[0],10000):
> k=210 # allows creating unique labels for the runner
> for i in range(0,3,2): # test
> k+=1
> bigQuery_dataset_table_name=bigquery_dataset_name+'.'+
> bigquery_table_name+str(k)
> print(bigQuery_dataset_table_name)
> label_read_row = "read "+bigQuery_dataset_table_name
> label_write_col = "write "+bigQuery_dataset_table_name
> *# is this possible to generate a PCollection with ParDo without create?*
> (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
> | label_write_col >> beam.io.Write(beam.io.
> BigQuerySink(bigQuery_dataset_table_name)))
>
> p.run().wait_until_finish()* #fires an error*
>
> Many thanks,
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>
--
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/