https://issues.apache.org/jira/browse/BEAM-3167 might be what you want for
this to all work as part of the pipeline, but looks like that is for Java,
not Python.

Also, the pandas_gbq library has a function for generating a BQ schema
<https://github.com/pydata/pandas-gbq/blob/master/pandas_gbq/_schema.py#L4>
from a DataFrame. Maybe something like that would work for your data?

On Mon, Apr 2, 2018 at 5:32 PM, Chamikara Jayalath <chamik...@google.com>
wrote:

>
>
> On Mon, Apr 2, 2018 at 5:19 PM Eila Arich-Landkof <e...@orielresearch.org>
> wrote:
>
>> Hi Cham,
>>
>> Thanks. I have created a PCollection from the dataset that is available
>> in the H5 file which is provided as numpy array.
>> It is very challenging for my use case to describe the schema. The
>> original dimensions of the dataset are 70K x 30K . Any suggestion how to
>> work around that?
>>
>>
> Can you write to a pre-created table by using "create_disposition =
> BigQueryDisposition.CREATE_NEVER" ? You can try to use BigQuery schema
> auto detection (https://cloud.google.com/bigquery/docs/schema-detect) to
> create the table before running the Beam pipeline.
>
>
>> I think that it was mentioned at the summit that there will be a way to
>> write to BQ without schema. Is something like that on the roadmap?
>>
>
> I don't think supporting this is in the immediate road map of Beam but any
> contributions in this space are welcome.
>
>
>>
>> Best,
>> Eila
>>
>> Sent from my iPhone
>>
>> On Apr 2, 2018, at 7:33 PM, Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>> (moving dev to bcc)
>>
>> Hi Eila,
>>
>> On Mon, Apr 2, 2018 at 3:50 PM OrielResearch Eila Arich-Landkof <
>> e...@orielresearch.org> wrote:
>>
>>> Hi All,
>>>
>>> I was able to make it work by creating the PCollection with the numpy
>>> array. However, writing to BQ was impossible because it requested for the
>>> schema.
>>> The code:
>>> (p | "create all" >> beam.Create(expression[1:5,1:5])
>>>    | "write all text" >> beam.io.WriteToText('gs://archs4/output/',
>>> file_name_suffix='.txt'))
>>>
>>> *Is there a walk around for providing schema for beam.io
>>> <http://beam.io>.BigQuerySink?*
>>>
>>
>> Regarding your earlier question, you do need at least one element in the
>> PCollection that triggers the ParDo to do any work (which can be a create
>> with a single element that you ignore).
>>
>> Not sure if I fully understood the BigQuery question. You have to specify
>> a schema when writing to a new BigQuery table. See following example,
>>
>> https://github.com/apache/beam/blob/master/sdks/python/
>> apache_beam/examples/snippets/snippets.py#L1085
>>
>>
>> Thanks,
>> Cham
>>
>>
>>
>>>
>>> Many thanks,
>>> Eila
>>>
>>> On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof <
>>> e...@orielresearch.org> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I would like to try a different way to leverage Apache beam for H5 =>
>>>> BQ (file to table transfer).
>>>>
>>>> For my use case, I would like to read every 10K rows of H5 data (numpy
>>>> array format), transpose them and write them to BQ 10K columns. 10K is BQ
>>>> columns limit.
>>>>
>>>> My code is below and fires the following error (I might have missed
>>>> something basic). I am not using beam.Create and trying to create a
>>>> PCollection from the ParDo transfer. is this posssible? if not, what is the
>>>> alternative for creating a PColleciton from numpy array? (if any)
>>>>
>>>> ERROR:root:Exception at bundle 
>>>> <apache_beam.runners.direct.bundle_factory._Bundle object at 
>>>> 0x7f00aad7b7a0>, due to an exception.
>>>>  Traceback (most recent call last):
>>>>   File 
>>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>>>  line 307, in call
>>>>     side_input_values)
>>>>   File 
>>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>>>  line 332, in attempt_call
>>>>     evaluator.start_bundle()
>>>>   File 
>>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
>>>>  line 540, in start_bundle
>>>>     self._applied_ptransform.inputs[0].windowing,
>>>> AttributeError: 'PBegin' object has no attribute 'windowing'
>>>>
>>>> ERROR:root:Giving up after 4 attempts.
>>>> WARNING:root:A task failed with exception: 'PBegin' object has no 
>>>> attribute 'windowing'
>>>> WARNING:root:A task failed with exception: 'PBegin' object has no 
>>>> attribute 'windowing'
>>>>
>>>>
>>>>
>>>> *Code:*
>>>>
>>>> options = PipelineOptions()
>>>> google_cloud_options = options.view_as(GoogleCloudOptions)
>>>> google_cloud_options.project = 'orielresearch-188115'
>>>> google_cloud_options.job_name = 'h5-to-bq-10K'
>>>> google_cloud_options.staging_location = 'gs://archs4/staging'
>>>> google_cloud_options.temp_location = 'gs://archs4/temp'
>>>> options.view_as(StandardOptions).runner = 'DirectRunner'
>>>>
>>>> p = beam.Pipeline(options=options)
>>>>
>>>> class read10kRowsDoFn(beam.DoFn):
>>>>   def process(self, element,index):
>>>>     print(index)
>>>>     row_start = index
>>>>     row_end = index+10000
>>>>     # returns numpy array - numpy.ndarray
>>>>     d = expression[row_start,row_end,:]
>>>>     np.transpose(d)
>>>>     return(d)
>>>>
>>>> #for i in range(0,expression.shape[0],10000):
>>>> k=210 # allows creating unique labels for the runner
>>>> for i in range(0,3,2): # test
>>>>   k+=1
>>>>   bigQuery_dataset_table_name=bigquery_dataset_name+'.'+
>>>> bigquery_table_name+str(k)
>>>>   print(bigQuery_dataset_table_name)
>>>>   label_read_row = "read "+bigQuery_dataset_table_name
>>>>   label_write_col = "write "+bigQuery_dataset_table_name
>>>> *# is this possible to generate a PCollection with ParDo without
>>>> create?*
>>>>   (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
>>>>      | label_write_col >> beam.io.Write(beam.io.
>>>> BigQuerySink(bigQuery_dataset_table_name)))
>>>>
>>>> p.run().wait_until_finish()* #fires an error*
>>>>
>>>> Many thanks,
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>>
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>

Reply via email to