I looked at the job ID you quoted, and yes, it suffers from excessive fusion. I wish we had tooling to automatically detect that and emit a warning, but we don't have that yet.
Here's an example of how you can break fusion: https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L326-L339 On Mon, Jun 5, 2017 at 1:27 PM Sourabh Bajaj <[email protected]> wrote: > Yes you're correct. > > On Mon, Jun 5, 2017 at 1:23 PM Morand, Sebastien < > [email protected]> wrote: > >> Between parenthesis of each step I meant the number of records in output >> >> When I ungroup I send again the 200 data? not the 20M? >> >> Shouldn't I do instead: >> Read (200) -> GroupByKey (200) -> UnGroup(20M) -> combine (20M) -> clean >> (20M) -> filter (20M) -> insert >> >> >> *Sébastien MORAND* >> Team Lead Solution Architect >> Technology & Operations / Digital Factory >> Veolia - Group Information Systems & Technology (IS&T) >> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >> <+33%201%2085%2057%2071%2008> >> Bureau 0144C (Ouest) >> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >> *www.veolia.com <http://www.veolia.com>* >> <http://www.veolia.com> >> <https://www.facebook.com/veoliaenvironment/> >> <https://www.youtube.com/user/veoliaenvironnement> >> <https://www.linkedin.com/company/veolia-environnement> >> <https://twitter.com/veolia> >> >> On 5 June 2017 at 22:17, Sourabh Bajaj <[email protected]> wrote: >> >>> I think you need to do something like: >>> >>> Read (200) -> GroupByKey (200) -> UnGroup(200) [Not this can be on 200 >>> different workers] -> combine (20M) -> clean (20M) -> filter (20M) -> >>> insert >>> >>> On Mon, Jun 5, 2017 at 1:14 PM Morand, Sebastien < >>> [email protected]> wrote: >>> >>>> Yes fusion looks like my problem. A job ID to look at: >>>> 2017-06-05_10_14_25-5856213199384263626. >>>> >>>> The point is in your link: >>>> << >>>> For example, one case in which fusion can limit Dataflow's ability to >>>> optimize worker usage is a "high fan-out" ParDo. In such an operation, you >>>> might have an input collection with relatively few elements, but the ParDo >>>> produces an output with hundreds or thousands of times as many elements, >>>> followed by another ParDo >>>> >> >>>> >>>> This is exactly what I'm doing in the step transform-combine-7d5ad942 >>>> in the above job id. >>>> >>>> As fas as I understand, I should create a GroupByKey after the >>>> transform-combine-7d5ad942 on a unique field and then ungroup the data? >>>> (meaning I add two operations in the pipeline to help the worker? >>>> >>>> Right now: >>>> Read (200) -> combine (20M) -> clean (20M) -> filter (20M) -> insert >>>> >>>> Will become: >>>> Read (200) -> combine (20M) -> GroupByKey (20M) -> ungroup (20M) -> >>>> clean (20M) -> filter (20M) -> insert >>>> >>>> It this the right way? >>>> >>>> >>>> >>>> >>>> *Sébastien MORAND* >>>> Team Lead Solution Architect >>>> Technology & Operations / Digital Factory >>>> Veolia - Group Information Systems & Technology (IS&T) >>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>> <+33%201%2085%2057%2071%2008> >>>> Bureau 0144C (Ouest) >>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>> *www.veolia.com <http://www.veolia.com>* >>>> <http://www.veolia.com> >>>> <https://www.facebook.com/veoliaenvironment/> >>>> <https://www.youtube.com/user/veoliaenvironnement> >>>> <https://www.linkedin.com/company/veolia-environnement> >>>> <https://twitter.com/veolia> >>>> >>>> On 5 June 2017 at 21:42, Eugene Kirpichov <[email protected]> wrote: >>>> >>>>> Do you have a Dataflow job ID to look at? >>>>> It might be due to fusion >>>>> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion >>>>> >>>>> On Mon, Jun 5, 2017 at 12:13 PM Prabeesh K. <[email protected]> >>>>> wrote: >>>>> >>>>>> Please try using *--worker_machine_type* n1-standard-4 or >>>>>> n1-standard-8 >>>>>> >>>>>> On 5 June 2017 at 23:08, Morand, Sebastien < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I do have a problem with my tries to test scaling on dataflow. >>>>>>> >>>>>>> My dataflow is pretty simple: I get a list of files from pubsub, so >>>>>>> the number of files I'm going to use to feed the flow is well known at >>>>>>> the >>>>>>> begining. Here are my steps: >>>>>>> Let's say I have 200 files containing about 20,000,000 of records >>>>>>> >>>>>>> - *First Step:* Read file contents from storage: files are >>>>>>> .tar.gz containing each 4 files (CSV). I return the file content as >>>>>>> the >>>>>>> whole in a record >>>>>>> *OUT:* 200 records (one for each file containing the data of all >>>>>>> 4 files). Bascillacy it's a dict : {file1: content_of_file1, file2: >>>>>>> content_of_file2, etc...} >>>>>>> >>>>>>> - *Second step:* Joining the data of the 4 files in one record >>>>>>> (the main file contains foreign key to get information from the >>>>>>> other files) >>>>>>> *OUT:* 20,000,000 records each for every line in the files. Each >>>>>>> record is a list of string >>>>>>> >>>>>>> - *Third step:* cleaning data (convert to prepare integration in >>>>>>> bigquery) and set them as a dict where keys are bigquery column name. >>>>>>> *OUT:* 20,000,000 records as dict for each record >>>>>>> >>>>>>> - *Fourth step:* insert into bigquery >>>>>>> >>>>>>> So the first step return 200 records, but I have 20,000,000 records >>>>>>> to insert. >>>>>>> This takes about 1 hour and half and always use 1 worker ... >>>>>>> >>>>>>> If I manually set the number of workers, it's not really faster. So >>>>>>> for an unknow reason, it doesn't scale, any ideas how to do it? >>>>>>> >>>>>>> Thanks for any help. >>>>>>> >>>>>>> *Sébastien MORAND* >>>>>>> Team Lead Solution Architect >>>>>>> Technology & Operations / Digital Factory >>>>>>> Veolia - Group Information Systems & Technology (IS&T) >>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>>>>> <+33%201%2085%2057%2071%2008> >>>>>>> Bureau 0144C (Ouest) >>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>>>>> *www.veolia.com <http://www.veolia.com>* >>>>>>> <http://www.veolia.com> >>>>>>> <https://www.facebook.com/veoliaenvironment/> >>>>>>> <https://www.youtube.com/user/veoliaenvironnement> >>>>>>> <https://www.linkedin.com/company/veolia-environnement> >>>>>>> <https://twitter.com/veolia> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -------------------------------------------------------------------------------------------- >>>>>>> This e-mail transmission (message and any attached files) may >>>>>>> contain information that is proprietary, privileged and/or confidential >>>>>>> to >>>>>>> Veolia Environnement and/or its affiliates and is intended exclusively >>>>>>> for >>>>>>> the person(s) to whom it is addressed. If you are not the intended >>>>>>> recipient, please notify the sender by return e-mail and delete all >>>>>>> copies >>>>>>> of this e-mail, including all attachments. Unless expressly authorized, >>>>>>> any >>>>>>> use, disclosure, publication, retransmission or dissemination of this >>>>>>> e-mail and/or of its attachments is strictly prohibited. >>>>>>> >>>>>>> Ce message electronique et ses fichiers attaches sont strictement >>>>>>> confidentiels et peuvent contenir des elements dont Veolia Environnement >>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >>>>>>> message par erreur, merci de le retourner a son emetteur et de le >>>>>>> detruire >>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la >>>>>>> publication, la distribution, ou la reproduction non expressement >>>>>>> autorisees de ce message et de ses pieces attachees sont interdites. >>>>>>> >>>>>>> -------------------------------------------------------------------------------------------- >>>>>>> >>>>>> >>>>>> >>>> >>>> >>>> >>>> -------------------------------------------------------------------------------------------- >>>> This e-mail transmission (message and any attached files) may contain >>>> information that is proprietary, privileged and/or confidential to Veolia >>>> Environnement and/or its affiliates and is intended exclusively for the >>>> person(s) to whom it is addressed. If you are not the intended recipient, >>>> please notify the sender by return e-mail and delete all copies of this >>>> e-mail, including all attachments. Unless expressly authorized, any use, >>>> disclosure, publication, retransmission or dissemination of this e-mail >>>> and/or of its attachments is strictly prohibited. >>>> >>>> Ce message electronique et ses fichiers attaches sont strictement >>>> confidentiels et peuvent contenir des elements dont Veolia Environnement >>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >>>> message par erreur, merci de le retourner a son emetteur et de le detruire >>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la >>>> publication, la distribution, ou la reproduction non expressement >>>> autorisees de ce message et de ses pieces attachees sont interdites. >>>> >>>> -------------------------------------------------------------------------------------------- >>>> >>> >> >> >> >> -------------------------------------------------------------------------------------------- >> This e-mail transmission (message and any attached files) may contain >> information that is proprietary, privileged and/or confidential to Veolia >> Environnement and/or its affiliates and is intended exclusively for the >> person(s) to whom it is addressed. If you are not the intended recipient, >> please notify the sender by return e-mail and delete all copies of this >> e-mail, including all attachments. Unless expressly authorized, any use, >> disclosure, publication, retransmission or dissemination of this e-mail >> and/or of its attachments is strictly prohibited. >> >> Ce message electronique et ses fichiers attaches sont strictement >> confidentiels et peuvent contenir des elements dont Veolia Environnement >> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >> message par erreur, merci de le retourner a son emetteur et de le detruire >> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la >> publication, la distribution, ou la reproduction non expressement >> autorisees de ce message et de ses pieces attachees sont interdites. >> >> -------------------------------------------------------------------------------------------- >> >
