On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof < e...@orielresearch.org> wrote:
> Hi Cham, > > *all_data = pcollections | beam.Flatten()* > > fires an error: > > TypeError: 'Read' object is not iterable > > > pcollections is the following list: > > [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>, > <Read(PTransform) label=[Read] at 0x7f9fa988a350>, > <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>, > <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>] > > > Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how you ended up with a list of Read PTransforms otherwise. Also, follow everything with a "p.run.wait_until_finish()" for pipeline to execute. Can you paste the code that you are running ? > Based on the following, i converted the list to tuples (tuple(*pcollections)) > with the same error for tuple.* > > > # Flatten takes a tuple of PCollection objects.# Returns a single PCollection > that contains all of the elements in the PCollection objects in that > tuple.merged = ( > (pcoll1, pcoll2, pcoll3) > # A list of tuples can be "piped" directly into a Flatten transform. > | beam.Flatten()) > > > Any advice? > > Many thanks, > Eila > > > On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof < > e...@orielresearch.org> wrote: > >> very helpful!!! i will keep you posted if I have any issue / question >> Best, >> Eila >> >> >> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <chamik...@google.com >> > wrote: >> >>> >>> >>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof < >>> e...@orielresearch.org> wrote: >>> >>>> Hi Cham, >>>> >>>> Please see inline. If possible, code / pseudo code will help a lot. >>>> Thanks, >>>> Eila >>>> >>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> Hi Eila, >>>>> >>>>> Please find my comments inline. >>>>> >>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof < >>>>> e...@orielresearch.org> wrote: >>>>> >>>>>> Hello all, >>>>>> >>>>>> It was nice to meet you last week!!! >>>>>> >>>>>> >>>>> It was nice to meet you as well :) >>>>> >>>>> >>>>>> I am writing genomic pCollection that is created from bigQuery to a >>>>>> folder. Following is the code with output so you can run it with any >>>>>> small >>>>>> BQ table and let me know what your thoughts are: >>>>>> >>>>>> This init is only for debugging. In production I will use the >>>> pipeline syntax >>>> >>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index': >>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14': >>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}] >>>>>> >>>>>> rows[1].keys() >>>>>> # output: [u'index', u'SNRPCP14'] >>>>>> >>>>>> # you can change `archs4.results_20180308_ to any other table name >>>>>> with index column >>>>>> queries2 = rows | beam.Map(lambda x: >>>>>> (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115', >>>>>> use_standard_sql=False, query=str('SELECT * FROM >>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))), >>>>>> >>>>>> str('gs://archs4/output/'+x["index"]+'/'))) >>>>>> >>>>> >>>>> I don't think above code will work (not portable across runners at >>>>> least). BigQuerySource (along with Read transform) have to be applied to a >>>>> Pipeline object. So probably change this to a for loop that creates a set >>>>> of read transforms and use Flatten to create a single PCollection. >>>>> >>>> For debug, I am running on the local datalab runner. For the >>>> production, I will be running only dataflow runner. I think that I was able >>>> to query the tables that way, I will double check it. The indexes could go >>>> to millions - my concern is that I will not be able to leverage on Beam >>>> distribution capability when I use the the loop option. Any thoughts on >>>> that? >>>> >>> >>> You mean you'll have millions of queries. That will not be scalable. My >>> suggestion was to loop on queries. Can you reduce to one or a small number >>> of queries and perform further processing in Beam ? >>> >>> >>>> >>>>> >>>>>> >>>>>> queries2 >>>>>> # output: a list of pCollection and the path to write the pCollection >>>>>> data to >>>>>> >>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>, >>>>>> 'gs://archs4/output/GSM2313641/'), >>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb950>, >>>>>> 'gs://archs4/output/GSM2316666/'), >>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>, >>>>>> 'gs://archs4/output/GSM2312355/'), >>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>, >>>>>> 'gs://archs4/output/GSM2312372/')] >>>>>> >>>>>> >>>>> What you got here is a PCollection of PTransform objects which is not >>>>> useful. >>>>> >>>>> >>>>>> >>>>>> *# this is my challenge* >>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND >>>>>> COLUMN") >>>>>> >>>>>> >>>>> Once you update above code you will get a proper PCollection of >>>>> elements read from BigQuery. You can transform and write this (to files, >>>>> BQ, or any other sink) as needed. >>>>> >>>> >>>> it is a list of tupples with PCollection and the path to write to. the >>>> path is not unique and I might have more than one PCollection written to >>>> the same destination. How do I pass the path from the tupple list as a >>>> parameter to the text file name? Could you please add the code that you >>>> were thinking about? >>>> >>> >>> Python SDK does not support writing to different files based on the >>> values of data (dynamic writes). So you'll have to either partition data >>> into separate PCollections or write all data into the same location. >>> >>> Here's *pseudocode* (untested) for reading from few queries, >>> partitioning into several PCollections, and writing to different >>> destinations. >>> >>> *queries = ['select * from A', 'select * from B',....]* >>> >>> *p = Pipeline()* >>> *pcollections = []* >>> *for query in queries:* >>> * pc = p | beam.io.Read(beam.io >>> <http://beam.io/>.BigQuerySource(query=query))* >>> * pcollections.append(pc)* >>> >>> *all_data = pcollections | beam.Flatten()* >>> *partitions = all_data | beam.Partition(my_partition_fn)* >>> *for i, partition in enumerate(partitions):* >>> * partition | beam.io.WriteToText(<unique path for partition i>)* >>> Hope this helps. >>> >>> Thanks, >>> Cham >>> >>> >>> >>>> Please see programming guide on how to write to text files (section 5.3 >>>>> and click Python tab): >>>>> https://beam.apache.org/documentation/programming-guide/ >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> >>>>>> Do you have any idea how to sink the data to a text file? I have >>>>>> tried few other options and was stuck at the write transform >>>>>> >>>>>> Any advice is very appreciated. >>>>>> >>>>>> Thanks, >>>>>> Eila >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Eila >>>>>> www.orielresearch.org >>>>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Eila >>>> www.orielresearch.org >>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>> >>> >> >> >> -- >> Eila >> www.orielresearch.org >> https://www.meetup.com/Deep-Learning-In-Production/ >> > > > > -- > Eila > www.orielresearch.org > https://www.meetup.com/Deep-Learning-In-Production/ >