Hi Cham, *all_data = pcollections | beam.Flatten()*
fires an error: TypeError: 'Read' object is not iterable pcollections is the following list: [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>, <Read(PTransform) label=[Read] at 0x7f9fa988a350>, <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>, <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>] Based on the following, i converted the list to tuples (tuple(*pcollections)) with the same error for tuple.* # Flatten takes a tuple of PCollection objects.# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.merged = ( (pcoll1, pcoll2, pcoll3) # A list of tuples can be "piped" directly into a Flatten transform. | beam.Flatten()) Any advice? Many thanks, Eila On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof < e...@orielresearch.org> wrote: > very helpful!!! i will keep you posted if I have any issue / question > Best, > Eila > > > On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <chamik...@google.com> > wrote: > >> >> >> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof < >> e...@orielresearch.org> wrote: >> >>> Hi Cham, >>> >>> Please see inline. If possible, code / pseudo code will help a lot. >>> Thanks, >>> Eila >>> >>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath < >>> chamik...@google.com> wrote: >>> >>>> Hi Eila, >>>> >>>> Please find my comments inline. >>>> >>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof < >>>> e...@orielresearch.org> wrote: >>>> >>>>> Hello all, >>>>> >>>>> It was nice to meet you last week!!! >>>>> >>>>> >>>> It was nice to meet you as well :) >>>> >>>> >>>>> I am writing genomic pCollection that is created from bigQuery to a >>>>> folder. Following is the code with output so you can run it with any small >>>>> BQ table and let me know what your thoughts are: >>>>> >>>>> This init is only for debugging. In production I will use the pipeline >>> syntax >>> >>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index': >>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14': >>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}] >>>>> >>>>> rows[1].keys() >>>>> # output: [u'index', u'SNRPCP14'] >>>>> >>>>> # you can change `archs4.results_20180308_ to any other table name >>>>> with index column >>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.BigQuery >>>>> Source(project='orielresearch-188115', use_standard_sql=False, >>>>> query=str('SELECT * FROM `archs4.results_20180308_*` where index=\'%s\'' % >>>>> (x["index"])))), >>>>> str('gs://archs4/output/'+x[" >>>>> index"]+'/'))) >>>>> >>>> >>>> I don't think above code will work (not portable across runners at >>>> least). BigQuerySource (along with Read transform) have to be applied to a >>>> Pipeline object. So probably change this to a for loop that creates a set >>>> of read transforms and use Flatten to create a single PCollection. >>>> >>> For debug, I am running on the local datalab runner. For the production, >>> I will be running only dataflow runner. I think that I was able to query >>> the tables that way, I will double check it. The indexes could go to >>> millions - my concern is that I will not be able to leverage on Beam >>> distribution capability when I use the the loop option. Any thoughts on >>> that? >>> >> >> You mean you'll have millions of queries. That will not be scalable. My >> suggestion was to loop on queries. Can you reduce to one or a small number >> of queries and perform further processing in Beam ? >> >> >>> >>>> >>>>> >>>>> queries2 >>>>> # output: a list of pCollection and the path to write the pCollection >>>>> data to >>>>> >>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>, >>>>> 'gs://archs4/output/GSM2313641/'), >>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb950>, >>>>> 'gs://archs4/output/GSM2316666/'), >>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>, >>>>> 'gs://archs4/output/GSM2312355/'), >>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>, >>>>> 'gs://archs4/output/GSM2312372/')] >>>>> >>>>> >>>> What you got here is a PCollection of PTransform objects which is not >>>> useful. >>>> >>>> >>>>> >>>>> *# this is my challenge* >>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND >>>>> COLUMN") >>>>> >>>>> >>>> Once you update above code you will get a proper PCollection of >>>> elements read from BigQuery. You can transform and write this (to files, >>>> BQ, or any other sink) as needed. >>>> >>> >>> it is a list of tupples with PCollection and the path to write to. the >>> path is not unique and I might have more than one PCollection written to >>> the same destination. How do I pass the path from the tupple list as a >>> parameter to the text file name? Could you please add the code that you >>> were thinking about? >>> >> >> Python SDK does not support writing to different files based on the >> values of data (dynamic writes). So you'll have to either partition data >> into separate PCollections or write all data into the same location. >> >> Here's *pseudocode* (untested) for reading from few queries, >> partitioning into several PCollections, and writing to different >> destinations. >> >> *queries = ['select * from A', 'select * from B',....]* >> >> *p = Pipeline()* >> *pcollections = []* >> *for query in queries:* >> * pc = p | beam.io.Read(beam.io >> <http://beam.io/>.BigQuerySource(query=query))* >> * pcollections.append(pc)* >> >> *all_data = pcollections | beam.Flatten()* >> *partitions = all_data | beam.Partition(my_partition_fn)* >> *for i, partition in enumerate(partitions):* >> * partition | beam.io.WriteToText(<unique path for partition i>)* >> Hope this helps. >> >> Thanks, >> Cham >> >> >> >>> Please see programming guide on how to write to text files (section 5.3 >>>> and click Python tab): https://beam.apache.org/ >>>> documentation/programming-guide/ >>>> >>>> Thanks, >>>> Cham >>>> >>>> >>>>> Do you have any idea how to sink the data to a text file? I have tried >>>>> few other options and was stuck at the write transform >>>>> >>>>> Any advice is very appreciated. >>>>> >>>>> Thanks, >>>>> Eila >>>>> >>>>> >>>>> >>>>> -- >>>>> Eila >>>>> www.orielresearch.org >>>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>>> >>>> >>> >>> >>> -- >>> Eila >>> www.orielresearch.org >>> https://www.meetup.com/Deep-Learning-In-Production/ >>> >> > > > -- > Eila > www.orielresearch.org > https://www.meetup.com/Deep-Learning-In-Production/ > -- Eila www.orielresearch.org https://www.meetup.com/Deep-Learning-In-Production/