On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hi Cham,
>
> *all_data = pcollections | beam.Flatten()*
>
> fires an error:
>
> TypeError: 'Read' object is not iterable
>
>
> pcollections is the following list:
>
> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
>  <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
>  <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
>  <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]
>
>
>
Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how you
ended up with a list of Read PTransforms otherwise.

Also, follow everything with a "p.run.wait_until_finish()" for pipeline to
execute.

Can you paste the code that you are running ?


> Based on the following, i converted the list to tuples (tuple(*pcollections)) 
> with the same error for tuple.*
>
>
> # Flatten takes a tuple of PCollection objects.# Returns a single PCollection 
> that contains all of the elements in the PCollection objects in that 
> tuple.merged = (
>     (pcoll1, pcoll2, pcoll3)
>     # A list of tuples can be "piped" directly into a Flatten transform.
>     | beam.Flatten())
>
>
> Any advice?
>
> Many thanks,
> Eila
>
>
> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
> e...@orielresearch.org> wrote:
>
>> very helpful!!! i will keep you posted if I have any issue / question
>> Best,
>> Eila
>>
>>
>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <chamik...@google.com
>> > wrote:
>>
>>>
>>>
>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>>> e...@orielresearch.org> wrote:
>>>
>>>> Hi Cham,
>>>>
>>>> Please see inline. If possible, code / pseudo code will help a lot.
>>>> Thanks,
>>>> Eila
>>>>
>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>> Hi Eila,
>>>>>
>>>>> Please find my comments inline.
>>>>>
>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>>> e...@orielresearch.org> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> It was nice to meet you last week!!!
>>>>>>
>>>>>>
>>>>> It was nice to meet you as well :)
>>>>>
>>>>>
>>>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>>>> folder. Following is the code with output so you can run it with any 
>>>>>> small
>>>>>> BQ table and let me know what your thoughts are:
>>>>>>
>>>>>> This init is only for debugging. In production I will use the
>>>> pipeline syntax
>>>>
>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>>
>>>>>> rows[1].keys()
>>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>>
>>>>>> # you can change `archs4.results_20180308_ to any other table name
>>>>>> with index column
>>>>>> queries2 = rows | beam.Map(lambda x: 
>>>>>> (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115',
>>>>>> use_standard_sql=False, query=str('SELECT * FROM
>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>>>>
>>>>>>  str('gs://archs4/output/'+x["index"]+'/')))
>>>>>>
>>>>>
>>>>> I don't think above code will work (not portable across runners at
>>>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>>>> Pipeline object. So probably change this to a for loop that creates a set
>>>>> of read transforms and use Flatten to create a single PCollection.
>>>>>
>>>> For debug, I am running on the local datalab runner. For the
>>>> production, I will be running only dataflow runner. I think that I was able
>>>> to query the tables that way, I will double check it. The indexes could go
>>>> to millions - my concern is that I will not be able to leverage on Beam
>>>> distribution capability when I use the the loop option. Any thoughts on
>>>> that?
>>>>
>>>
>>> You mean you'll have millions of queries. That will not be scalable. My
>>> suggestion was to loop on queries. Can you reduce to one or a small number
>>> of queries and perform further processing in Beam ?
>>>
>>>
>>>>
>>>>>
>>>>>>
>>>>>> queries2
>>>>>> # output: a list of pCollection and the path to write the pCollection
>>>>>> data to
>>>>>>
>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>>
>>>>>>
>>>>> What you got here is a PCollection of PTransform objects which is not
>>>>> useful.
>>>>>
>>>>>
>>>>>>
>>>>>> *# this is my challenge*
>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>>> COLUMN")
>>>>>>
>>>>>>
>>>>> Once you update above code you will get a proper PCollection of
>>>>> elements read from BigQuery. You can transform and write this (to files,
>>>>> BQ, or any other sink) as needed.
>>>>>
>>>>
>>>> it is a list of tupples with PCollection and the path to write to. the
>>>> path is not unique and I might have more than one PCollection written to
>>>> the same destination. How do I pass the path from the tupple list as a
>>>> parameter to the text file name? Could you please add the code that you
>>>> were thinking about?
>>>>
>>>
>>> Python SDK does not support writing to different files based on the
>>> values of data (dynamic writes). So you'll have to either partition data
>>> into separate PCollections  or write all data into the same location.
>>>
>>> Here's *pseudocode* (untested) for reading from few queries,
>>> partitioning into several PCollections, and writing to different
>>> destinations.
>>>
>>> *queries = ['select * from A', 'select * from B',....]*
>>>
>>> *p = Pipeline()*
>>> *pcollections = []*
>>> *for query in queries:*
>>> *  pc = p | beam.io.Read(beam.io
>>> <http://beam.io/>.BigQuerySource(query=query))*
>>> * pcollections.append(pc)*
>>>
>>> *all_data = pcollections | beam.Flatten()*
>>> *partitions = all_data | beam.Partition(my_partition_fn)*
>>> *for i, partition in enumerate(partitions):*
>>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>>> Hope this helps.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>>> Please see programming guide on how to write to text files (section 5.3
>>>>> and click Python tab):
>>>>> https://beam.apache.org/documentation/programming-guide/
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>> Do you have any idea how to sink the data to a text file? I have
>>>>>> tried few other options and was stuck at the write transform
>>>>>>
>>>>>> Any advice is very appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>> Eila
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eila
>>>>>> www.orielresearch.org
>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>

Reply via email to