Hi Cham,

*all_data = pcollections | beam.Flatten()*

fires an error:

TypeError: 'Read' object is not iterable


pcollections is the following list:

[<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
 <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
 <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
 <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]


Based on the following, i converted the list to tuples
(tuple(*pcollections)) with the same error for tuple.*


# Flatten takes a tuple of PCollection objects.# Returns a single
PCollection that contains all of the elements in the PCollection
objects in that tuple.merged = (
    (pcoll1, pcoll2, pcoll3)
    # A list of tuples can be "piped" directly into a Flatten transform.
    | beam.Flatten())


Any advice?

Many thanks,
Eila


On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> very helpful!!! i will keep you posted if I have any issue / question
> Best,
> Eila
>
>
> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>>
>>
>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>> e...@orielresearch.org> wrote:
>>
>>> Hi Cham,
>>>
>>> Please see inline. If possible, code / pseudo code will help a lot.
>>> Thanks,
>>> Eila
>>>
>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
>>>> Hi Eila,
>>>>
>>>> Please find my comments inline.
>>>>
>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>> e...@orielresearch.org> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> It was nice to meet you last week!!!
>>>>>
>>>>>
>>>> It was nice to meet you as well :)
>>>>
>>>>
>>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>>> folder. Following is the code with output so you can run it with any small
>>>>> BQ table and let me know what your thoughts are:
>>>>>
>>>>> This init is only for debugging. In production I will use the pipeline
>>> syntax
>>>
>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>
>>>>> rows[1].keys()
>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>
>>>>> # you can change `archs4.results_20180308_ to any other table name
>>>>> with index column
>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.BigQuery
>>>>> Source(project='orielresearch-188115', use_standard_sql=False,
>>>>> query=str('SELECT * FROM `archs4.results_20180308_*` where index=\'%s\'' %
>>>>> (x["index"])))),
>>>>>                                str('gs://archs4/output/'+x["
>>>>> index"]+'/')))
>>>>>
>>>>
>>>> I don't think above code will work (not portable across runners at
>>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>>> Pipeline object. So probably change this to a for loop that creates a set
>>>> of read transforms and use Flatten to create a single PCollection.
>>>>
>>> For debug, I am running on the local datalab runner. For the production,
>>> I will be running only dataflow runner. I think that I was able to query
>>> the tables that way, I will double check it. The indexes could go to
>>> millions - my concern is that I will not be able to leverage on Beam
>>> distribution capability when I use the the loop option. Any thoughts on
>>> that?
>>>
>>
>> You mean you'll have millions of queries. That will not be scalable. My
>> suggestion was to loop on queries. Can you reduce to one or a small number
>> of queries and perform further processing in Beam ?
>>
>>
>>>
>>>>
>>>>>
>>>>> queries2
>>>>> # output: a list of pCollection and the path to write the pCollection
>>>>> data to
>>>>>
>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>
>>>>>
>>>> What you got here is a PCollection of PTransform objects which is not
>>>> useful.
>>>>
>>>>
>>>>>
>>>>> *# this is my challenge*
>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>> COLUMN")
>>>>>
>>>>>
>>>> Once you update above code you will get a proper PCollection of
>>>> elements read from BigQuery. You can transform and write this (to files,
>>>> BQ, or any other sink) as needed.
>>>>
>>>
>>> it is a list of tupples with PCollection and the path to write to. the
>>> path is not unique and I might have more than one PCollection written to
>>> the same destination. How do I pass the path from the tupple list as a
>>> parameter to the text file name? Could you please add the code that you
>>> were thinking about?
>>>
>>
>> Python SDK does not support writing to different files based on the
>> values of data (dynamic writes). So you'll have to either partition data
>> into separate PCollections  or write all data into the same location.
>>
>> Here's *pseudocode* (untested) for reading from few queries,
>> partitioning into several PCollections, and writing to different
>> destinations.
>>
>> *queries = ['select * from A', 'select * from B',....]*
>>
>> *p = Pipeline()*
>> *pcollections = []*
>> *for query in queries:*
>> *  pc = p | beam.io.Read(beam.io
>> <http://beam.io/>.BigQuerySource(query=query))*
>> * pcollections.append(pc)*
>>
>> *all_data = pcollections | beam.Flatten()*
>> *partitions = all_data | beam.Partition(my_partition_fn)*
>> *for i, partition in enumerate(partitions):*
>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>> Hope this helps.
>>
>> Thanks,
>> Cham
>>
>>
>>
>>> Please see programming guide on how to write to text files (section 5.3
>>>> and click Python tab): https://beam.apache.org/
>>>> documentation/programming-guide/
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>> Do you have any idea how to sink the data to a text file? I have tried
>>>>> few other options and was stuck at the write transform
>>>>>
>>>>> Any advice is very appreciated.
>>>>>
>>>>> Thanks,
>>>>> Eila
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eila
>>>>> www.orielresearch.org
>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>
>>>>
>>>
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>



-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Reply via email to