(moving dev to bcc)

Hi Eila,

On Mon, Apr 2, 2018 at 3:50 PM OrielResearch Eila Arich-Landkof <
[email protected]> wrote:

> Hi All,
>
> I was able to make it work by creating the PCollection with the numpy
> array. However, writing to BQ was impossible because it requested for the
> schema.
> The code:
> (p | "create all" >> beam.Create(expression[1:5,1:5])
>    | "write all text" >> beam.io.WriteToText('gs://archs4/output/',
> file_name_suffix='.txt'))
>
> *Is there a walk around for providing schema for beam.io
> <http://beam.io>.BigQuerySink?*
>

Regarding your earlier question, you do need at least one element in the
PCollection that triggers the ParDo to do any work (which can be a create
with a single element that you ignore).

Not sure if I fully understood the BigQuery question. You have to specify a
schema when writing to a new BigQuery table. See following example,

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L1085


Thanks,
Cham



>
> Many thanks,
> Eila
>
> On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof <
> [email protected]> wrote:
>
>> Hello all,
>>
>> I would like to try a different way to leverage Apache beam for H5 => BQ
>> (file to table transfer).
>>
>> For my use case, I would like to read every 10K rows of H5 data (numpy
>> array format), transpose them and write them to BQ 10K columns. 10K is BQ
>> columns limit.
>>
>> My code is below and fires the following error (I might have missed
>> something basic). I am not using beam.Create and trying to create a
>> PCollection from the ParDo transfer. is this posssible? if not, what is the
>> alternative for creating a PColleciton from numpy array? (if any)
>>
>> ERROR:root:Exception at bundle 
>> <apache_beam.runners.direct.bundle_factory._Bundle object at 
>> 0x7f00aad7b7a0>, due to an exception.
>>  Traceback (most recent call last):
>>   File 
>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>  line 307, in call
>>     side_input_values)
>>   File 
>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>  line 332, in attempt_call
>>     evaluator.start_bundle()
>>   File 
>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
>>  line 540, in start_bundle
>>     self._applied_ptransform.inputs[0].windowing,
>> AttributeError: 'PBegin' object has no attribute 'windowing'
>>
>> ERROR:root:Giving up after 4 attempts.
>> WARNING:root:A task failed with exception: 'PBegin' object has no attribute 
>> 'windowing'
>> WARNING:root:A task failed with exception: 'PBegin' object has no attribute 
>> 'windowing'
>>
>>
>>
>> *Code:*
>>
>> options = PipelineOptions()
>> google_cloud_options = options.view_as(GoogleCloudOptions)
>> google_cloud_options.project = 'orielresearch-188115'
>> google_cloud_options.job_name = 'h5-to-bq-10K'
>> google_cloud_options.staging_location = 'gs://archs4/staging'
>> google_cloud_options.temp_location = 'gs://archs4/temp'
>> options.view_as(StandardOptions).runner = 'DirectRunner'
>>
>> p = beam.Pipeline(options=options)
>>
>> class read10kRowsDoFn(beam.DoFn):
>>   def process(self, element,index):
>>     print(index)
>>     row_start = index
>>     row_end = index+10000
>>     # returns numpy array - numpy.ndarray
>>     d = expression[row_start,row_end,:]
>>     np.transpose(d)
>>     return(d)
>>
>> #for i in range(0,expression.shape[0],10000):
>> k=210 # allows creating unique labels for the runner
>> for i in range(0,3,2): # test
>>   k+=1
>>
>> bigQuery_dataset_table_name=bigquery_dataset_name+'.'+bigquery_table_name+str(k)
>>   print(bigQuery_dataset_table_name)
>>   label_read_row = "read "+bigQuery_dataset_table_name
>>   label_write_col = "write "+bigQuery_dataset_table_name
>> *# is this possible to generate a PCollection with ParDo without create?*
>>   (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
>>      | label_write_col >> beam.io.Write(beam.io
>> .BigQuerySink(bigQuery_dataset_table_name)))
>>
>> p.run().wait_until_finish()* #fires an error*
>>
>> Many thanks,
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>

Reply via email to