Hi All, I was able to make it work by creating the PCollection with the numpy array. However, writing to BQ was impossible because it requested for the schema. The code: (p | "create all" >> beam.Create(expression[1:5,1:5]) | "write all text" >> beam.io.WriteToText('gs://archs4/output/', file_name_suffix='.txt'))
*Is there a walk around for providing schema for beam.io <http://beam.io>.BigQuerySink?* Many thanks, Eila On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof < e...@orielresearch.org> wrote: > Hello all, > > I would like to try a different way to leverage Apache beam for H5 => BQ > (file to table transfer). > > For my use case, I would like to read every 10K rows of H5 data (numpy > array format), transpose them and write them to BQ 10K columns. 10K is BQ > columns limit. > > My code is below and fires the following error (I might have missed > something basic). I am not using beam.Create and trying to create a > PCollection from the ParDo transfer. is this posssible? if not, what is the > alternative for creating a PColleciton from numpy array? (if any) > > ERROR:root:Exception at bundle > <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f00aad7b7a0>, > due to an exception. > Traceback (most recent call last): > File > "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", > line 307, in call > side_input_values) > File > "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", > line 332, in attempt_call > evaluator.start_bundle() > File > "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", > line 540, in start_bundle > self._applied_ptransform.inputs[0].windowing, > AttributeError: 'PBegin' object has no attribute 'windowing' > > ERROR:root:Giving up after 4 attempts. > WARNING:root:A task failed with exception: 'PBegin' object has no attribute > 'windowing' > WARNING:root:A task failed with exception: 'PBegin' object has no attribute > 'windowing' > > > > *Code:* > > options = PipelineOptions() > google_cloud_options = options.view_as(GoogleCloudOptions) > google_cloud_options.project = 'orielresearch-188115' > google_cloud_options.job_name = 'h5-to-bq-10K' > google_cloud_options.staging_location = 'gs://archs4/staging' > google_cloud_options.temp_location = 'gs://archs4/temp' > options.view_as(StandardOptions).runner = 'DirectRunner' > > p = beam.Pipeline(options=options) > > class read10kRowsDoFn(beam.DoFn): > def process(self, element,index): > print(index) > row_start = index > row_end = index+10000 > # returns numpy array - numpy.ndarray > d = expression[row_start,row_end,:] > np.transpose(d) > return(d) > > #for i in range(0,expression.shape[0],10000): > k=210 # allows creating unique labels for the runner > for i in range(0,3,2): # test > k+=1 > bigQuery_dataset_table_name=bigquery_dataset_name+'.'+ > bigquery_table_name+str(k) > print(bigQuery_dataset_table_name) > label_read_row = "read "+bigQuery_dataset_table_name > label_write_col = "write "+bigQuery_dataset_table_name > *# is this possible to generate a PCollection with ParDo without create?* > (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i)) > | label_write_col >> beam.io.Write(beam.io. > BigQuerySink(bigQuery_dataset_table_name))) > > p.run().wait_until_finish()* #fires an error* > > Many thanks, > > -- > Eila > www.orielresearch.org > https://www.meetup.com/Deep-Learning-In-Production/ > -- Eila www.orielresearch.org https://www.meetup.com/Deep-Learning-In-Production/