Hi Kevin / Cham / all,

Thank you for your comments. I was not aware of the create_disposition
option. I am planning to publish the code for working with H5 for the
public use and will contribute to the open source project when I have more
resources available.

Following are my insights and questions:

*Pipeline - writing each sample's feature to BigQuery:*

*Schema - *
Pandas allows me to create an empty table with list of column names that I
can use as well - this will automatically generate the schema (the API is
gbq.to_gbq).
*Data size - *
Writing bulk data with Pandas BQ API, there is a limit to the size of the
dataFrame that is being written. I am looking for another solution to write
bulk data to BQ.
- I understand tha bigQuery sink is not the right solution - since it is
writing every row separately. Is that the right approach? In addition, to
my understanding, Sinking rows might be the most expensive way to work with
BQ. Writing columns should be cheaper (I am not expert in BQ, but I think
that BQ data structure recommends on using columns as much as possible to
reduce cost). *Is there a way to work with columns when sinking bulk data
to BQ?*
*Asynchronous writing to BQ:*
- This can be done with ParDo OR other BQ API that might be available (I am
looking for a different way now). *any other suggestions?*


*Pipeline - writing each sample's feature to a text file:*
The values are spread into 3 different files.  How can I identify the order
of the values. The file names are

-00001-of-00003
<https://storage.cloud.google.com/archs4/output/GSM678413/-00001-of-00003?_ga=2.142015026.-143276530.1498846823>,

-00000-of-00003
<https://storage.cloud.google.com/archs4/output/GSM678413/-00000-of-00003?_ga=2.106936835.-143276530.1498846823>,
-00002-of-00003
<https://storage.cloud.google.com/archs4/output/GSM678413/-00001-of-00003?_ga=2.142015026.-143276530.1498846823>.

 while -00001-of-00003
<https://storage.cloud.google.com/archs4/output/GSM678413/-00001-of-00003?_ga=2.142015026.-143276530.1498846823>
has the values at the beginning. *Am I missing something here?*



Thank you,

Eila



On Tue, Apr 3, 2018 at 12:27 PM, Kevin Peterson <kevi...@google.com> wrote:

> https://issues.apache.org/jira/browse/BEAM-3167 might be what you want
> for this to all work as part of the pipeline, but looks like that is for
> Java, not Python.
>
> Also, the pandas_gbq library has a function for generating a BQ schema
> <https://github.com/pydata/pandas-gbq/blob/master/pandas_gbq/_schema.py#L4>
> from a DataFrame. Maybe something like that would work for your data?
>
> On Mon, Apr 2, 2018 at 5:32 PM, Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>>
>>
>> On Mon, Apr 2, 2018 at 5:19 PM Eila Arich-Landkof <e...@orielresearch.org>
>> wrote:
>>
>>> Hi Cham,
>>>
>>> Thanks. I have created a PCollection from the dataset that is available
>>> in the H5 file which is provided as numpy array.
>>> It is very challenging for my use case to describe the schema. The
>>> original dimensions of the dataset are 70K x 30K . Any suggestion how to
>>> work around that?
>>>
>>>
>> Can you write to a pre-created table by using "create_disposition =
>> BigQueryDisposition.CREATE_NEVER" ? You can try to use BigQuery schema
>> auto detection (https://cloud.google.com/bigquery/docs/schema-detect) to
>> create the table before running the Beam pipeline.
>>
>>
>>> I think that it was mentioned at the summit that there will be a way to
>>> write to BQ without schema. Is something like that on the roadmap?
>>>
>>
>> I don't think supporting this is in the immediate road map of Beam but
>> any contributions in this space are welcome.
>>
>>
>>>
>>> Best,
>>> Eila
>>>
>>> Sent from my iPhone
>>>
>>> On Apr 2, 2018, at 7:33 PM, Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>> (moving dev to bcc)
>>>
>>> Hi Eila,
>>>
>>> On Mon, Apr 2, 2018 at 3:50 PM OrielResearch Eila Arich-Landkof <
>>> e...@orielresearch.org> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I was able to make it work by creating the PCollection with the numpy
>>>> array. However, writing to BQ was impossible because it requested for the
>>>> schema.
>>>> The code:
>>>> (p | "create all" >> beam.Create(expression[1:5,1:5])
>>>>    | "write all text" >> beam.io.WriteToText('gs://archs4/output/',
>>>> file_name_suffix='.txt'))
>>>>
>>>> *Is there a walk around for providing schema for beam.io
>>>> <http://beam.io>.BigQuerySink?*
>>>>
>>>
>>> Regarding your earlier question, you do need at least one element in the
>>> PCollection that triggers the ParDo to do any work (which can be a create
>>> with a single element that you ignore).
>>>
>>> Not sure if I fully understood the BigQuery question. You have to
>>> specify a schema when writing to a new BigQuery table. See following
>>> example,
>>>
>>> https://github.com/apache/beam/blob/master/sdks/python/apach
>>> e_beam/examples/snippets/snippets.py#L1085
>>>
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>>>
>>>> Many thanks,
>>>> Eila
>>>>
>>>> On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof <
>>>> e...@orielresearch.org> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I would like to try a different way to leverage Apache beam for H5 =>
>>>>> BQ (file to table transfer).
>>>>>
>>>>> For my use case, I would like to read every 10K rows of H5 data (numpy
>>>>> array format), transpose them and write them to BQ 10K columns. 10K is BQ
>>>>> columns limit.
>>>>>
>>>>> My code is below and fires the following error (I might have missed
>>>>> something basic). I am not using beam.Create and trying to create a
>>>>> PCollection from the ParDo transfer. is this posssible? if not, what is 
>>>>> the
>>>>> alternative for creating a PColleciton from numpy array? (if any)
>>>>>
>>>>> ERROR:root:Exception at bundle 
>>>>> <apache_beam.runners.direct.bundle_factory._Bundle object at 
>>>>> 0x7f00aad7b7a0>, due to an exception.
>>>>>  Traceback (most recent call last):
>>>>>   File 
>>>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>>>>  line 307, in call
>>>>>     side_input_values)
>>>>>   File 
>>>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>>>>  line 332, in attempt_call
>>>>>     evaluator.start_bundle()
>>>>>   File 
>>>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
>>>>>  line 540, in start_bundle
>>>>>     self._applied_ptransform.inputs[0].windowing,
>>>>> AttributeError: 'PBegin' object has no attribute 'windowing'
>>>>>
>>>>> ERROR:root:Giving up after 4 attempts.
>>>>> WARNING:root:A task failed with exception: 'PBegin' object has no 
>>>>> attribute 'windowing'
>>>>> WARNING:root:A task failed with exception: 'PBegin' object has no 
>>>>> attribute 'windowing'
>>>>>
>>>>>
>>>>>
>>>>> *Code:*
>>>>>
>>>>> options = PipelineOptions()
>>>>> google_cloud_options = options.view_as(GoogleCloudOptions)
>>>>> google_cloud_options.project = 'orielresearch-188115'
>>>>> google_cloud_options.job_name = 'h5-to-bq-10K'
>>>>> google_cloud_options.staging_location = 'gs://archs4/staging'
>>>>> google_cloud_options.temp_location = 'gs://archs4/temp'
>>>>> options.view_as(StandardOptions).runner = 'DirectRunner'
>>>>>
>>>>> p = beam.Pipeline(options=options)
>>>>>
>>>>> class read10kRowsDoFn(beam.DoFn):
>>>>>   def process(self, element,index):
>>>>>     print(index)
>>>>>     row_start = index
>>>>>     row_end = index+10000
>>>>>     # returns numpy array - numpy.ndarray
>>>>>     d = expression[row_start,row_end,:]
>>>>>     np.transpose(d)
>>>>>     return(d)
>>>>>
>>>>> #for i in range(0,expression.shape[0],10000):
>>>>> k=210 # allows creating unique labels for the runner
>>>>> for i in range(0,3,2): # test
>>>>>   k+=1
>>>>>   bigQuery_dataset_table_name=bigquery_dataset_name+'.'+bigque
>>>>> ry_table_name+str(k)
>>>>>   print(bigQuery_dataset_table_name)
>>>>>   label_read_row = "read "+bigQuery_dataset_table_name
>>>>>   label_write_col = "write "+bigQuery_dataset_table_name
>>>>> *# is this possible to generate a PCollection with ParDo without
>>>>> create?*
>>>>>   (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
>>>>>      | label_write_col >> beam.io.Write(beam.io.BigQuery
>>>>> Sink(bigQuery_dataset_table_name)))
>>>>>
>>>>> p.run().wait_until_finish()* #fires an error*
>>>>>
>>>>> Many thanks,
>>>>>
>>>>> --
>>>>> Eila
>>>>> www.orielresearch.org
>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Reply via email to