Hi again,

So it scales. Now my pipeline is running in two parts:

   1. Reading files content (~400) and then GroupByKey

   2. From GroupByKey transform and write in bigquery (~50M)

2 is scaling as expected. 1 takes about 25 minutes on my files and 2 about
35 minutes scaling. But what if I want to Window so that the second part
starts sooner and the process is more parallel?

I tried to add a 60 seconds FixedWindow time but it's not working (Job ID
: 2017-06-06_04_36_01-9894155361321571250)

Regards,


*Sébastien MORAND*
Team Lead Solution Architect
Technology & Operations / Digital Factory
Veolia - Group Information Systems & Technology (IS&T)
Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
Bureau 0144C (Ouest)
30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
*www.veolia.com <http://www.veolia.com>*
<http://www.veolia.com>
<https://www.facebook.com/veoliaenvironment/>
<https://www.youtube.com/user/veoliaenvironnement>
<https://www.linkedin.com/company/veolia-environnement>
<https://twitter.com/veolia>

On 6 June 2017 at 01:24, Morand, Sebastien <[email protected]>
wrote:

> Fine, it scales ... Thank you very much.
>
> *Sébastien MORAND*
> Team Lead Solution Architect
> Technology & Operations / Digital Factory
> Veolia - Group Information Systems & Technology (IS&T)
> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
> <+33%201%2085%2057%2071%2008>
> Bureau 0144C (Ouest)
> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
> *www.veolia.com <http://www.veolia.com>*
> <http://www.veolia.com>
> <https://www.facebook.com/veoliaenvironment/>
> <https://www.youtube.com/user/veoliaenvironnement>
> <https://www.linkedin.com/company/veolia-environnement>
> <https://twitter.com/veolia>
>
> On 6 June 2017 at 00:31, Morand, Sebastien <[email protected]>
> wrote:
>
>> Thank you Eugene,
>>
>> I'm trying the Sourabh way (and yours since it looks like it's the same
>> idea) and let you know if it's better.
>>
>> Regards,
>>
>> *Sébastien MORAND*
>> Team Lead Solution Architect
>> Technology & Operations / Digital Factory
>> Veolia - Group Information Systems & Technology (IS&T)
>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>> <+33%201%2085%2057%2071%2008>
>> Bureau 0144C (Ouest)
>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>> *www.veolia.com <http://www.veolia.com>*
>> <http://www.veolia.com>
>> <https://www.facebook.com/veoliaenvironment/>
>> <https://www.youtube.com/user/veoliaenvironnement>
>> <https://www.linkedin.com/company/veolia-environnement>
>> <https://twitter.com/veolia>
>>
>> On 5 June 2017 at 23:31, Eugene Kirpichov <[email protected]> wrote:
>>
>>> I looked at the job ID you quoted, and yes, it suffers from excessive
>>> fusion. I wish we had tooling to automatically detect that and emit a
>>> warning, but we don't have that yet.
>>>
>>> Here's an example of how you can break fusion: https://github.com/apa
>>> che/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apa
>>> che/beam/sdk/io/jdbc/JdbcIO.java#L326-L339
>>>
>>> On Mon, Jun 5, 2017 at 1:27 PM Sourabh Bajaj <[email protected]>
>>> wrote:
>>>
>>>> Yes you're correct.
>>>>
>>>> On Mon, Jun 5, 2017 at 1:23 PM Morand, Sebastien <
>>>> [email protected]> wrote:
>>>>
>>>>> Between parenthesis of each step I meant the number of records in
>>>>> output
>>>>>
>>>>> When I ungroup I send again the 200 data? not the 20M?
>>>>>
>>>>> Shouldn't I do instead:
>>>>> Read (200)  -> GroupByKey (200) -> UnGroup(20M) -> combine (20M) ->
>>>>> clean (20M) -> filter (20M) -> insert
>>>>>
>>>>>
>>>>> *Sébastien MORAND*
>>>>> Team Lead Solution Architect
>>>>> Technology & Operations / Digital Factory
>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>> <+33%201%2085%2057%2071%2008>
>>>>> Bureau 0144C (Ouest)
>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>> <http://www.veolia.com>
>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>> <https://twitter.com/veolia>
>>>>>
>>>>> On 5 June 2017 at 22:17, Sourabh Bajaj <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I think you need to do something like:
>>>>>>
>>>>>> Read (200)  -> GroupByKey (200) -> UnGroup(200) [Not this can be on
>>>>>> 200 different workers] -> combine (20M) -> clean (20M) -> filter
>>>>>> (20M) -> insert
>>>>>>
>>>>>> On Mon, Jun 5, 2017 at 1:14 PM Morand, Sebastien <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Yes fusion looks like my problem. A job ID to look at:
>>>>>>> 2017-06-05_10_14_25-5856213199384263626.
>>>>>>>
>>>>>>> The point is in your link:
>>>>>>> <<
>>>>>>> For example, one case in which fusion can limit Dataflow's ability
>>>>>>> to optimize worker usage is a "high fan-out" ParDo. In such an 
>>>>>>> operation,
>>>>>>> you might have an input collection with relatively few elements, but the
>>>>>>> ParDo produces an output with hundreds or thousands of times as many
>>>>>>> elements, followed by another ParDo
>>>>>>> >>
>>>>>>>
>>>>>>> This is exactly what I'm doing in the step
>>>>>>> transform-combine-7d5ad942 in the above job id.
>>>>>>>
>>>>>>> As fas as I understand, I should create a GroupByKey after the
>>>>>>> transform-combine-7d5ad942 on a unique field and then ungroup the data?
>>>>>>> (meaning I add two operations in the pipeline to help the worker?
>>>>>>>
>>>>>>> Right now:
>>>>>>> Read (200) -> combine (20M) -> clean (20M) -> filter (20M) -> insert
>>>>>>>
>>>>>>> Will become:
>>>>>>> Read (200) -> combine (20M) -> GroupByKey (20M) -> ungroup (20M) ->
>>>>>>> clean (20M) -> filter (20M) -> insert
>>>>>>>
>>>>>>> It this the right way?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Sébastien MORAND*
>>>>>>> Team Lead Solution Architect
>>>>>>> Technology & Operations / Digital Factory
>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>> Bureau 0144C (Ouest)
>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>> <http://www.veolia.com>
>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>> <https://twitter.com/veolia>
>>>>>>>
>>>>>>> On 5 June 2017 at 21:42, Eugene Kirpichov <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Do you have a Dataflow job ID to look at?
>>>>>>>> It might be due to fusion https://cloud.google.co
>>>>>>>> m/dataflow/service/dataflow-service-desc#preventing-fusion
>>>>>>>>
>>>>>>>> On Mon, Jun 5, 2017 at 12:13 PM Prabeesh K. <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Please try using *--worker_machine_type* n1-standard-4 or
>>>>>>>>> n1-standard-8
>>>>>>>>>
>>>>>>>>> On 5 June 2017 at 23:08, Morand, Sebastien <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> I do have a problem with my tries to test scaling on dataflow.
>>>>>>>>>>
>>>>>>>>>> My dataflow is pretty simple: I get a list of files from pubsub,
>>>>>>>>>> so the number of files I'm going to use to feed the flow is well 
>>>>>>>>>> known at
>>>>>>>>>> the begining. Here are my steps:
>>>>>>>>>> Let's say I have 200 files containing about 20,000,000 of records
>>>>>>>>>>
>>>>>>>>>>    - *First Step:* Read file contents from storage: files are
>>>>>>>>>>    .tar.gz containing each 4 files (CSV). I return the file content 
>>>>>>>>>> as the
>>>>>>>>>>    whole in a record
>>>>>>>>>>    *OUT:* 200 records (one for each file containing the data of
>>>>>>>>>>    all 4 files). Bascillacy it's a dict : {file1: content_of_file1, 
>>>>>>>>>> file2:
>>>>>>>>>>    content_of_file2, etc...}
>>>>>>>>>>
>>>>>>>>>>    - *Second step:*  Joining the data of the 4 files in one
>>>>>>>>>>    record (the main file contains foreign key to get information 
>>>>>>>>>> from the
>>>>>>>>>>    other files)
>>>>>>>>>>    *OUT:* 20,000,000 records each for every line in the files.
>>>>>>>>>>    Each record is a list of string
>>>>>>>>>>
>>>>>>>>>>    - *Third step:* cleaning data (convert to prepare integration
>>>>>>>>>>    in bigquery) and set them as a dict where keys are bigquery 
>>>>>>>>>> column name.
>>>>>>>>>>    *OUT:* 20,000,000 records as dict for each record
>>>>>>>>>>
>>>>>>>>>>    - *Fourth step:* insert into bigquery
>>>>>>>>>>
>>>>>>>>>> So the first step return 200 records, but I have 20,000,000
>>>>>>>>>> records to insert.
>>>>>>>>>> This takes about 1 hour and half and always use 1 worker ...
>>>>>>>>>>
>>>>>>>>>> If I manually set the number of workers, it's not really faster.
>>>>>>>>>> So for an unknow reason, it doesn't scale, any ideas how to do it?
>>>>>>>>>>
>>>>>>>>>> Thanks for any help.
>>>>>>>>>>
>>>>>>>>>> *Sébastien MORAND*
>>>>>>>>>> Team Lead Solution Architect
>>>>>>>>>> Technology & Operations / Digital Factory
>>>>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>>>>> Bureau 0144C (Ouest)
>>>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>>>>> <http://www.veolia.com>
>>>>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>>>>> <https://twitter.com/veolia>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>> --------------------------------
>>>>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>>>>> contain information that is proprietary, privileged and/or 
>>>>>>>>>> confidential to
>>>>>>>>>> Veolia Environnement and/or its affiliates and is intended 
>>>>>>>>>> exclusively for
>>>>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>>>>> recipient, please notify the sender by return e-mail and delete all 
>>>>>>>>>> copies
>>>>>>>>>> of this e-mail, including all attachments. Unless expressly 
>>>>>>>>>> authorized, any
>>>>>>>>>> use, disclosure, publication, retransmission or dissemination of this
>>>>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>>>>
>>>>>>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>>>>>>> confidentiels et peuvent contenir des elements dont Veolia 
>>>>>>>>>> Environnement
>>>>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont 
>>>>>>>>>> donc
>>>>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>>>>>>> message par erreur, merci de le retourner a son emetteur et de le 
>>>>>>>>>> detruire
>>>>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la 
>>>>>>>>>> divulgation, la
>>>>>>>>>> publication, la distribution, ou la reproduction non expressement
>>>>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>> --------------------------------
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> --------------------------------
>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>> contain information that is proprietary, privileged and/or confidential 
>>>>>>> to
>>>>>>> Veolia Environnement and/or its affiliates and is intended exclusively 
>>>>>>> for
>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>> recipient, please notify the sender by return e-mail and delete all 
>>>>>>> copies
>>>>>>> of this e-mail, including all attachments. Unless expressly authorized, 
>>>>>>> any
>>>>>>> use, disclosure, publication, retransmission or dissemination of this
>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>
>>>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>>>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>>>> message par erreur, merci de le retourner a son emetteur et de le 
>>>>>>> detruire
>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>>>>>>> publication, la distribution, ou la reproduction non expressement
>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>>>> ------------------------------------------------------------
>>>>>>> --------------------------------
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> ------------------------------------------------------------
>>>>> --------------------------------
>>>>> This e-mail transmission (message and any attached files) may contain
>>>>> information that is proprietary, privileged and/or confidential to Veolia
>>>>> Environnement and/or its affiliates and is intended exclusively for the
>>>>> person(s) to whom it is addressed. If you are not the intended recipient,
>>>>> please notify the sender by return e-mail and delete all copies of this
>>>>> e-mail, including all attachments. Unless expressly authorized, any use,
>>>>> disclosure, publication, retransmission or dissemination of this e-mail
>>>>> and/or of its attachments is strictly prohibited.
>>>>>
>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>> message par erreur, merci de le retourner a son emetteur et de le detruire
>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>>>>> publication, la distribution, ou la reproduction non expressement
>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>> ------------------------------------------------------------
>>>>> --------------------------------
>>>>>
>>>>
>>
>

-- 

--------------------------------------------------------------------------------------------
This e-mail transmission (message and any attached files) may contain 
information that is proprietary, privileged and/or confidential to Veolia 
Environnement and/or its affiliates and is intended exclusively for the 
person(s) to whom it is addressed. If you are not the intended recipient, 
please notify the sender by return e-mail and delete all copies of this 
e-mail, including all attachments. Unless expressly authorized, any use, 
disclosure, publication, retransmission or dissemination of this e-mail 
and/or of its attachments is strictly prohibited. 

Ce message electronique et ses fichiers attaches sont strictement 
confidentiels et peuvent contenir des elements dont Veolia Environnement 
et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc 
destines a l'usage de leurs seuls destinataires. Si vous avez recu ce 
message par erreur, merci de le retourner a son emetteur et de le detruire 
ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la 
publication, la distribution, ou la reproduction non expressement 
autorisees de ce message et de ses pieces attachees sont interdites.
--------------------------------------------------------------------------------------------

Reply via email to