Hi,

It gets a list of filename while building the pipeline (*.tar.gz containing
each 4 files).

The first step of the pipeline read every line of each files and return a
pcollection where each value is a dict containing the content of each file
in the archive.

Example:

   - File file1.tar.gz contains file1_A.csv, file1_B.csv, file1_C.csv and
   file1_D.csv
   - The first step returns :
   {
       "file1_A.csv": [<LIST OF LINES IN file1_A.csv],
       "file1_B.csv": [<LIST OF LINES IN file1_B.csv],
       "file1_C.csv": [<LIST OF LINES IN file1_C.csv],
       "file1_D.csv": [<LIST OF LINES IN file1_D.csv]
   }

   => So the first step can be completly parallelized


*Sébastien MORAND*
Team Lead Solution Architect
Technology & Operations / Digital Factory
Veolia - Group Information Systems & Technology (IS&T)
Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
Bureau 0144C (Ouest)
30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
*www.veolia.com <http://www.veolia.com>*
<http://www.veolia.com>
<https://www.facebook.com/veoliaenvironment/>
<https://www.youtube.com/user/veoliaenvironnement>
<https://www.linkedin.com/company/veolia-environnement>
<https://twitter.com/veolia>

On 11 June 2017 at 04:01, Eugene Kirpichov <[email protected]> wrote:

> Hi Sebastien,
> Can you tell more about how your "step 1" works? I looked at the logs of
> your job and it's taking suspiciously long (~20 minutes) to produce the
> ~400 elements, and it's doing that sequentially. Is it possible to
> parallelize step 1?
>
> On Sat, Jun 10, 2017 at 5:53 PM Lukasz Cwik <[email protected]> wrote:
>
>> The Dataflow implementation when executing a batch pipeline does not
>> parallelize dependent fused segments irrespective of the windowing function
>> so #1 will fully execute before #2 starts.
>>
>> On Sat, Jun 10, 2017 at 3:48 PM, Morand, Sebastien <
>> [email protected]> wrote:
>>
>>> Hi again,
>>>
>>> So it scales. Now my pipeline is running in two parts:
>>>
>>>    1. Reading files content (~400) and then GroupByKey
>>>
>>>    2. From GroupByKey transform and write in bigquery (~50M)
>>>
>>> 2 is scaling as expected. 1 takes about 25 minutes on my files and 2
>>> about 35 minutes scaling. But what if I want to Window so that the second
>>> part starts sooner and the process is more parallel?
>>>
>>> I tried to add a 60 seconds FixedWindow time but it's not working (Job
>>> ID : 2017-06-06_04_36_01-9894155361321571250)
>>>
>>> Regards,
>>>
>>>
>>> *Sébastien MORAND*
>>> Team Lead Solution Architect
>>> Technology & Operations / Digital Factory
>>> Veolia - Group Information Systems & Technology (IS&T)
>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>> <+33%201%2085%2057%2071%2008>
>>> Bureau 0144C (Ouest)
>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>> *www.veolia.com <http://www.veolia.com>*
>>> <http://www.veolia.com>
>>> <https://www.facebook.com/veoliaenvironment/>
>>> <https://www.youtube.com/user/veoliaenvironnement>
>>> <https://www.linkedin.com/company/veolia-environnement>
>>> <https://twitter.com/veolia>
>>>
>>> On 6 June 2017 at 01:24, Morand, Sebastien <[email protected]>
>>> wrote:
>>>
>>>> Fine, it scales ... Thank you very much.
>>>>
>>>> *Sébastien MORAND*
>>>> Team Lead Solution Architect
>>>> Technology & Operations / Digital Factory
>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>> <+33%201%2085%2057%2071%2008>
>>>> Bureau 0144C (Ouest)
>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>> *www.veolia.com <http://www.veolia.com>*
>>>> <http://www.veolia.com>
>>>> <https://www.facebook.com/veoliaenvironment/>
>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>> <https://twitter.com/veolia>
>>>>
>>>> On 6 June 2017 at 00:31, Morand, Sebastien <[email protected]
>>>> > wrote:
>>>>
>>>>> Thank you Eugene,
>>>>>
>>>>> I'm trying the Sourabh way (and yours since it looks like it's the
>>>>> same idea) and let you know if it's better.
>>>>>
>>>>> Regards,
>>>>>
>>>>> *Sébastien MORAND*
>>>>> Team Lead Solution Architect
>>>>> Technology & Operations / Digital Factory
>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>> <+33%201%2085%2057%2071%2008>
>>>>> Bureau 0144C (Ouest)
>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>> <http://www.veolia.com>
>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>> <https://twitter.com/veolia>
>>>>>
>>>>> On 5 June 2017 at 23:31, Eugene Kirpichov <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I looked at the job ID you quoted, and yes, it suffers from excessive
>>>>>> fusion. I wish we had tooling to automatically detect that and emit a
>>>>>> warning, but we don't have that yet.
>>>>>>
>>>>>> Here's an example of how you can break fusion: https://github.com/
>>>>>> apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/
>>>>>> org/apache/beam/sdk/io/jdbc/JdbcIO.java#L326-L339
>>>>>>
>>>>>> On Mon, Jun 5, 2017 at 1:27 PM Sourabh Bajaj <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes you're correct.
>>>>>>>
>>>>>>> On Mon, Jun 5, 2017 at 1:23 PM Morand, Sebastien <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Between parenthesis of each step I meant the number of records in
>>>>>>>> output
>>>>>>>>
>>>>>>>> When I ungroup I send again the 200 data? not the 20M?
>>>>>>>>
>>>>>>>> Shouldn't I do instead:
>>>>>>>> Read (200)  -> GroupByKey (200) -> UnGroup(20M) -> combine (20M) ->
>>>>>>>> clean (20M) -> filter (20M) -> insert
>>>>>>>>
>>>>>>>>
>>>>>>>> *Sébastien MORAND*
>>>>>>>> Team Lead Solution Architect
>>>>>>>> Technology & Operations / Digital Factory
>>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>>> Bureau 0144C (Ouest)
>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>>> <http://www.veolia.com>
>>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>>> <https://twitter.com/veolia>
>>>>>>>>
>>>>>>>> On 5 June 2017 at 22:17, Sourabh Bajaj <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think you need to do something like:
>>>>>>>>>
>>>>>>>>> Read (200)  -> GroupByKey (200) -> UnGroup(200) [Not this can be
>>>>>>>>> on 200 different workers] -> combine (20M) -> clean (20M) ->
>>>>>>>>> filter (20M) -> insert
>>>>>>>>>
>>>>>>>>> On Mon, Jun 5, 2017 at 1:14 PM Morand, Sebastien <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Yes fusion looks like my problem. A job ID to look at:
>>>>>>>>>> 2017-06-05_10_14_25-5856213199384263626.
>>>>>>>>>>
>>>>>>>>>> The point is in your link:
>>>>>>>>>> <<
>>>>>>>>>> For example, one case in which fusion can limit Dataflow's
>>>>>>>>>> ability to optimize worker usage is a "high fan-out" ParDo. In such 
>>>>>>>>>> an
>>>>>>>>>> operation, you might have an input collection with relatively few 
>>>>>>>>>> elements,
>>>>>>>>>> but the ParDo produces an output with hundreds or thousands of times 
>>>>>>>>>> as
>>>>>>>>>> many elements, followed by another ParDo
>>>>>>>>>> >>
>>>>>>>>>>
>>>>>>>>>> This is exactly what I'm doing in the step
>>>>>>>>>> transform-combine-7d5ad942 in the above job id.
>>>>>>>>>>
>>>>>>>>>> As fas as I understand, I should create a GroupByKey after the
>>>>>>>>>> transform-combine-7d5ad942 on a unique field and then ungroup the 
>>>>>>>>>> data?
>>>>>>>>>> (meaning I add two operations in the pipeline to help the worker?
>>>>>>>>>>
>>>>>>>>>> Right now:
>>>>>>>>>> Read (200) -> combine (20M) -> clean (20M) -> filter (20M) ->
>>>>>>>>>> insert
>>>>>>>>>>
>>>>>>>>>> Will become:
>>>>>>>>>> Read (200) -> combine (20M) -> GroupByKey (20M) -> ungroup (20M)
>>>>>>>>>> -> clean (20M) -> filter (20M) -> insert
>>>>>>>>>>
>>>>>>>>>> It this the right way?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Sébastien MORAND*
>>>>>>>>>> Team Lead Solution Architect
>>>>>>>>>> Technology & Operations / Digital Factory
>>>>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>>>>> Bureau 0144C (Ouest)
>>>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>>>>> <http://www.veolia.com>
>>>>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>>>>> <https://twitter.com/veolia>
>>>>>>>>>>
>>>>>>>>>> On 5 June 2017 at 21:42, Eugene Kirpichov <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Do you have a Dataflow job ID to look at?
>>>>>>>>>>> It might be due to fusion https://cloud.google.
>>>>>>>>>>> com/dataflow/service/dataflow-service-desc#preventing-fusion
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jun 5, 2017 at 12:13 PM Prabeesh K. <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Please try using *--worker_machine_type* n1-standard-4 or
>>>>>>>>>>>> n1-standard-8
>>>>>>>>>>>>
>>>>>>>>>>>> On 5 June 2017 at 23:08, Morand, Sebastien <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I do have a problem with my tries to test scaling on dataflow.
>>>>>>>>>>>>>
>>>>>>>>>>>>> My dataflow is pretty simple: I get a list of files from
>>>>>>>>>>>>> pubsub, so the number of files I'm going to use to feed the flow 
>>>>>>>>>>>>> is well
>>>>>>>>>>>>> known at the begining. Here are my steps:
>>>>>>>>>>>>> Let's say I have 200 files containing about 20,000,000 of
>>>>>>>>>>>>> records
>>>>>>>>>>>>>
>>>>>>>>>>>>>    - *First Step:* Read file contents from storage: files are
>>>>>>>>>>>>>    .tar.gz containing each 4 files (CSV). I return the file 
>>>>>>>>>>>>> content as the
>>>>>>>>>>>>>    whole in a record
>>>>>>>>>>>>>    *OUT:* 200 records (one for each file containing the data
>>>>>>>>>>>>>    of all 4 files). Bascillacy it's a dict : {file1: 
>>>>>>>>>>>>> content_of_file1, file2:
>>>>>>>>>>>>>    content_of_file2, etc...}
>>>>>>>>>>>>>
>>>>>>>>>>>>>    - *Second step:*  Joining the data of the 4 files in one
>>>>>>>>>>>>>    record (the main file contains foreign key to get information 
>>>>>>>>>>>>> from the
>>>>>>>>>>>>>    other files)
>>>>>>>>>>>>>    *OUT:* 20,000,000 records each for every line in the
>>>>>>>>>>>>>    files. Each record is a list of string
>>>>>>>>>>>>>
>>>>>>>>>>>>>    - *Third step:* cleaning data (convert to prepare
>>>>>>>>>>>>>    integration in bigquery) and set them as a dict where keys are 
>>>>>>>>>>>>> bigquery
>>>>>>>>>>>>>    column name.
>>>>>>>>>>>>>    *OUT:* 20,000,000 records as dict for each record
>>>>>>>>>>>>>
>>>>>>>>>>>>>    - *Fourth step:* insert into bigquery
>>>>>>>>>>>>>
>>>>>>>>>>>>> So the first step return 200 records, but I have 20,000,000
>>>>>>>>>>>>> records to insert.
>>>>>>>>>>>>> This takes about 1 hour and half and always use 1 worker ...
>>>>>>>>>>>>>
>>>>>>>>>>>>> If I manually set the number of workers, it's not really
>>>>>>>>>>>>> faster. So for an unknow reason, it doesn't scale, any ideas how 
>>>>>>>>>>>>> to do it?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for any help.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Sébastien MORAND*
>>>>>>>>>>>>> Team Lead Solution Architect
>>>>>>>>>>>>> Technology & Operations / Digital Factory
>>>>>>>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>>>>>>>> Bureau 0144C (Ouest)
>>>>>>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>>>>>>>> <http://www.veolia.com>
>>>>>>>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>>>>>>>> <https://twitter.com/veolia>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> --------------------------------
>>>>>>>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>>>>>>>> contain information that is proprietary, privileged and/or 
>>>>>>>>>>>>> confidential to
>>>>>>>>>>>>> Veolia Environnement and/or its affiliates and is intended 
>>>>>>>>>>>>> exclusively for
>>>>>>>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>>>>>>>> recipient, please notify the sender by return e-mail and delete 
>>>>>>>>>>>>> all copies
>>>>>>>>>>>>> of this e-mail, including all attachments. Unless expressly 
>>>>>>>>>>>>> authorized, any
>>>>>>>>>>>>> use, disclosure, publication, retransmission or dissemination of 
>>>>>>>>>>>>> this
>>>>>>>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Ce message electronique et ses fichiers attaches sont
>>>>>>>>>>>>> strictement confidentiels et peuvent contenir des elements dont 
>>>>>>>>>>>>> Veolia
>>>>>>>>>>>>> Environnement et/ou l'une de ses entites affiliees sont 
>>>>>>>>>>>>> proprietaires. Ils
>>>>>>>>>>>>> sont donc destines a l'usage de leurs seuls destinataires. Si 
>>>>>>>>>>>>> vous avez
>>>>>>>>>>>>> recu ce message par erreur, merci de le retourner a son emetteur 
>>>>>>>>>>>>> et de le
>>>>>>>>>>>>> detruire ainsi que toutes les pieces attachees. L'utilisation, la
>>>>>>>>>>>>> divulgation, la publication, la distribution, ou la reproduction 
>>>>>>>>>>>>> non
>>>>>>>>>>>>> expressement autorisees de ce message et de ses pieces attachees 
>>>>>>>>>>>>> sont
>>>>>>>>>>>>> interdites.
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> --------------------------------
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>> --------------------------------
>>>>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>>>>> contain information that is proprietary, privileged and/or 
>>>>>>>>>> confidential to
>>>>>>>>>> Veolia Environnement and/or its affiliates and is intended 
>>>>>>>>>> exclusively for
>>>>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>>>>> recipient, please notify the sender by return e-mail and delete all 
>>>>>>>>>> copies
>>>>>>>>>> of this e-mail, including all attachments. Unless expressly 
>>>>>>>>>> authorized, any
>>>>>>>>>> use, disclosure, publication, retransmission or dissemination of this
>>>>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>>>>
>>>>>>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>>>>>>> confidentiels et peuvent contenir des elements dont Veolia 
>>>>>>>>>> Environnement
>>>>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont 
>>>>>>>>>> donc
>>>>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>>>>>>> message par erreur, merci de le retourner a son emetteur et de le 
>>>>>>>>>> detruire
>>>>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la 
>>>>>>>>>> divulgation, la
>>>>>>>>>> publication, la distribution, ou la reproduction non expressement
>>>>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>> --------------------------------
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------------------------------------
>>>>>>>> --------------------------------
>>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>>> contain information that is proprietary, privileged and/or 
>>>>>>>> confidential to
>>>>>>>> Veolia Environnement and/or its affiliates and is intended exclusively 
>>>>>>>> for
>>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>>> recipient, please notify the sender by return e-mail and delete all 
>>>>>>>> copies
>>>>>>>> of this e-mail, including all attachments. Unless expressly 
>>>>>>>> authorized, any
>>>>>>>> use, disclosure, publication, retransmission or dissemination of this
>>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>>
>>>>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>>>>> confidentiels et peuvent contenir des elements dont Veolia 
>>>>>>>> Environnement
>>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>>>>> message par erreur, merci de le retourner a son emetteur et de le 
>>>>>>>> detruire
>>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, 
>>>>>>>> la
>>>>>>>> publication, la distribution, ou la reproduction non expressement
>>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>>>>> ------------------------------------------------------------
>>>>>>>> --------------------------------
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>>
>>> ------------------------------------------------------------
>>> --------------------------------
>>> This e-mail transmission (message and any attached files) may contain
>>> information that is proprietary, privileged and/or confidential to Veolia
>>> Environnement and/or its affiliates and is intended exclusively for the
>>> person(s) to whom it is addressed. If you are not the intended recipient,
>>> please notify the sender by return e-mail and delete all copies of this
>>> e-mail, including all attachments. Unless expressly authorized, any use,
>>> disclosure, publication, retransmission or dissemination of this e-mail
>>> and/or of its attachments is strictly prohibited.
>>>
>>> Ce message electronique et ses fichiers attaches sont strictement
>>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>> message par erreur, merci de le retourner a son emetteur et de le detruire
>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>>> publication, la distribution, ou la reproduction non expressement
>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>> ------------------------------------------------------------
>>> --------------------------------
>>>
>>
>>

-- 

--------------------------------------------------------------------------------------------
This e-mail transmission (message and any attached files) may contain 
information that is proprietary, privileged and/or confidential to Veolia 
Environnement and/or its affiliates and is intended exclusively for the 
person(s) to whom it is addressed. If you are not the intended recipient, 
please notify the sender by return e-mail and delete all copies of this 
e-mail, including all attachments. Unless expressly authorized, any use, 
disclosure, publication, retransmission or dissemination of this e-mail 
and/or of its attachments is strictly prohibited. 

Ce message electronique et ses fichiers attaches sont strictement 
confidentiels et peuvent contenir des elements dont Veolia Environnement 
et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc 
destines a l'usage de leurs seuls destinataires. Si vous avez recu ce 
message par erreur, merci de le retourner a son emetteur et de le detruire 
ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la 
publication, la distribution, ou la reproduction non expressement 
autorisees de ce message et de ses pieces attachees sont interdites.
--------------------------------------------------------------------------------------------

Reply via email to