https://issues.apache.org/jira/browse/BEAM-3167 might be what you want for this to all work as part of the pipeline, but looks like that is for Java, not Python.
Also, the pandas_gbq library has a function for generating a BQ schema <https://github.com/pydata/pandas-gbq/blob/master/pandas_gbq/_schema.py#L4> from a DataFrame. Maybe something like that would work for your data? On Mon, Apr 2, 2018 at 5:32 PM, Chamikara Jayalath <chamik...@google.com> wrote: > > > On Mon, Apr 2, 2018 at 5:19 PM Eila Arich-Landkof <e...@orielresearch.org> > wrote: > >> Hi Cham, >> >> Thanks. I have created a PCollection from the dataset that is available >> in the H5 file which is provided as numpy array. >> It is very challenging for my use case to describe the schema. The >> original dimensions of the dataset are 70K x 30K . Any suggestion how to >> work around that? >> >> > Can you write to a pre-created table by using "create_disposition = > BigQueryDisposition.CREATE_NEVER" ? You can try to use BigQuery schema > auto detection (https://cloud.google.com/bigquery/docs/schema-detect) to > create the table before running the Beam pipeline. > > >> I think that it was mentioned at the summit that there will be a way to >> write to BQ without schema. Is something like that on the roadmap? >> > > I don't think supporting this is in the immediate road map of Beam but any > contributions in this space are welcome. > > >> >> Best, >> Eila >> >> Sent from my iPhone >> >> On Apr 2, 2018, at 7:33 PM, Chamikara Jayalath <chamik...@google.com> >> wrote: >> >> (moving dev to bcc) >> >> Hi Eila, >> >> On Mon, Apr 2, 2018 at 3:50 PM OrielResearch Eila Arich-Landkof < >> e...@orielresearch.org> wrote: >> >>> Hi All, >>> >>> I was able to make it work by creating the PCollection with the numpy >>> array. However, writing to BQ was impossible because it requested for the >>> schema. >>> The code: >>> (p | "create all" >> beam.Create(expression[1:5,1:5]) >>> | "write all text" >> beam.io.WriteToText('gs://archs4/output/', >>> file_name_suffix='.txt')) >>> >>> *Is there a walk around for providing schema for beam.io >>> <http://beam.io>.BigQuerySink?* >>> >> >> Regarding your earlier question, you do need at least one element in the >> PCollection that triggers the ParDo to do any work (which can be a create >> with a single element that you ignore). >> >> Not sure if I fully understood the BigQuery question. You have to specify >> a schema when writing to a new BigQuery table. See following example, >> >> https://github.com/apache/beam/blob/master/sdks/python/ >> apache_beam/examples/snippets/snippets.py#L1085 >> >> >> Thanks, >> Cham >> >> >> >>> >>> Many thanks, >>> Eila >>> >>> On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof < >>> e...@orielresearch.org> wrote: >>> >>>> Hello all, >>>> >>>> I would like to try a different way to leverage Apache beam for H5 => >>>> BQ (file to table transfer). >>>> >>>> For my use case, I would like to read every 10K rows of H5 data (numpy >>>> array format), transpose them and write them to BQ 10K columns. 10K is BQ >>>> columns limit. >>>> >>>> My code is below and fires the following error (I might have missed >>>> something basic). I am not using beam.Create and trying to create a >>>> PCollection from the ParDo transfer. is this posssible? if not, what is the >>>> alternative for creating a PColleciton from numpy array? (if any) >>>> >>>> ERROR:root:Exception at bundle >>>> <apache_beam.runners.direct.bundle_factory._Bundle object at >>>> 0x7f00aad7b7a0>, due to an exception. >>>> Traceback (most recent call last): >>>> File >>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", >>>> line 307, in call >>>> side_input_values) >>>> File >>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", >>>> line 332, in attempt_call >>>> evaluator.start_bundle() >>>> File >>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", >>>> line 540, in start_bundle >>>> self._applied_ptransform.inputs[0].windowing, >>>> AttributeError: 'PBegin' object has no attribute 'windowing' >>>> >>>> ERROR:root:Giving up after 4 attempts. >>>> WARNING:root:A task failed with exception: 'PBegin' object has no >>>> attribute 'windowing' >>>> WARNING:root:A task failed with exception: 'PBegin' object has no >>>> attribute 'windowing' >>>> >>>> >>>> >>>> *Code:* >>>> >>>> options = PipelineOptions() >>>> google_cloud_options = options.view_as(GoogleCloudOptions) >>>> google_cloud_options.project = 'orielresearch-188115' >>>> google_cloud_options.job_name = 'h5-to-bq-10K' >>>> google_cloud_options.staging_location = 'gs://archs4/staging' >>>> google_cloud_options.temp_location = 'gs://archs4/temp' >>>> options.view_as(StandardOptions).runner = 'DirectRunner' >>>> >>>> p = beam.Pipeline(options=options) >>>> >>>> class read10kRowsDoFn(beam.DoFn): >>>> def process(self, element,index): >>>> print(index) >>>> row_start = index >>>> row_end = index+10000 >>>> # returns numpy array - numpy.ndarray >>>> d = expression[row_start,row_end,:] >>>> np.transpose(d) >>>> return(d) >>>> >>>> #for i in range(0,expression.shape[0],10000): >>>> k=210 # allows creating unique labels for the runner >>>> for i in range(0,3,2): # test >>>> k+=1 >>>> bigQuery_dataset_table_name=bigquery_dataset_name+'.'+ >>>> bigquery_table_name+str(k) >>>> print(bigQuery_dataset_table_name) >>>> label_read_row = "read "+bigQuery_dataset_table_name >>>> label_write_col = "write "+bigQuery_dataset_table_name >>>> *# is this possible to generate a PCollection with ParDo without >>>> create?* >>>> (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i)) >>>> | label_write_col >> beam.io.Write(beam.io. >>>> BigQuerySink(bigQuery_dataset_table_name))) >>>> >>>> p.run().wait_until_finish()* #fires an error* >>>> >>>> Many thanks, >>>> >>>> -- >>>> Eila >>>> www.orielresearch.org >>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>> >>> >>> >>> >>> -- >>> Eila >>> www.orielresearch.org >>> https://www.meetup.com/Deep-Learning-In-Production/ >>> >>