Thanks for the advice, I have already read that guide, but it didn’t really 
made things simpler for me.

At last, I’ve rewrote the whole pipeline in pure Python, and it became 8x times 
faster for the same set of raw data (DirectRunner 16 minutes, simple Python 2 
minutes).
It’s not as nice and distributed as a Beam pipeline is, but it could also be 
executed on a Google Compute instance, so no network charges apply.

--
Bests,
Andor

Feladó: Lukasz Cwik [mailto:[email protected]]
Küldve: Friday, August 24, 2018 20:58
Címzett: [email protected]
Tárgy: Re: Window bounded to unbounded PCollection

Python has a very limited amount of sources which you can use that can be used 
to create an unbounded pipeline. There is some documentation here as to the 
various limits: https://beam.apache.org/documentation/sdks/python-streaming/

To achieve what your looking for, use ReadFromPubsub and use a ParDo that reads 
the filename and parses all the records that you want. You will also need to 
write a ParDo that outputs the data to files.


On Fri, Aug 24, 2018 at 11:47 AM Tóth Andor 
<[email protected]<mailto:[email protected]>> 
wrote:
I can deal with being less efficient if it’s more transparent :). I want to 
achieve the latter, unbounded scenario. How can it be made?

--
Bests,
Andor

Feladó: Lukasz Cwik [mailto:[email protected]<mailto:[email protected]>]
Küldve: Friday, August 24, 2018 19:58
Címzett: [email protected]<mailto:[email protected]>
Tárgy: Re: Window bounded to unbounded PCollection

In a bounded pipeline "GroupByKey waits for all data to arrive, and no output 
files appear until the end." is expected. Runners like Dataflow, Spark, ... 
process the transform tree in topological order. In your pipeline, this would 
mean that all of the data is read and written to the group by key. Once that is 
complete, all of the data is read from the group by key and then written to the 
output files. This is done because it is very efficient.

If you treat this as an unbounded pipeline, then all transforms are running in 
parallel and partial progress is continuously made but this is significantly 
less efficient then the above.

On Fri, Aug 24, 2018 at 12:39 AM Tóth Andor 
<[email protected]<mailto:[email protected]>> 
wrote:
Yes, I did. The same happens as currently with windowing. GroupByKey waits for 
all data to arrive, and no output files appear until the end.
Maybe it’s not a memory issue and Beam sorts this out with temporary files. But 
if there’s a problem in thousands of files and such amount of data, then I 
won’t be able to fix and resume, already preprocessed data in temporary files 
are lost. Also I cannot really see the progress.

Meanwhile I had to realize, that there’s something with the AddTimestampDoFn, 
because if on a small sample I collect the output of it into a list, then those 
are not TimestampedValue objects, therefore windowing also could not occur. I 
suppose that’s because the PCollection I’m reading from is not unbounded, 
therefore timestamp is thrown away. It may sound nonsense if you know Beam 
well, but that’s my best guess now.

I have found a possible workaround, as to push file contents into PubSub, then 
read data from there with Beam/DataFlow. Dataflow even has a template for that 
called “GCS Text to Cloud PubSub”. Though, I can’t believe that there’s no 
simpler and more elegant way to solve this.

--
Bests,
Andor

Feladó: Lukasz Cwik [mailto:[email protected]<mailto:[email protected]>]
Küldve: Friday, August 24, 2018 00:10
Címzett: [email protected]<mailto:[email protected]>
Tárgy: Re: Window bounded to unbounded PCollection

It is unclear what the purpose of windowing is since windowing doesn't impact 
memory utilization much on Google Dataflow.

The Direct runner uses memory proportional to the amount of data processed but 
the Google Dataflow runner does not. The Google Dataflow runner's memory usage 
is proportional to the size of your elements. You can reach out to Google Cloud 
Platform support with some job ids or additional questions if you want.

Have you tried a pipeline like (without windowing):

lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)
results = lines \
| 'groupbykey' >> beam.GroupByKey()\
| 'parse' >>
| beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
| 'invalid_rows', 'missing_recipients', main='main_valid')
output = results['main_valid'] \
        | 'format' >> beam.Map(output_format) \
        | 'write' >> beam.io.WriteToText(known_args.output, 
file_name_suffix=".gz")


On Thu, Aug 23, 2018 at 5:32 AM Tóth Andor 
<[email protected]<mailto:[email protected]>> 
wrote:
Hello,

I'm trying to process a few terrabytes of mail logs without using too much 
memory, by windowing the bounded source into an unbounded one.
Still, the GroupByKey waits for all data to arrive. Can you give me hints how 
to work this around?

I have already searched, read available manuals and documentation, but I still 
don't have a clue.
Neither Direct, nor Google Dataflow runner works.

I'm using Python. Every item gets a timestamp, and then they are sorted to 
sliding windows, by the following code:

lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)

window_trigger = trigger.AfterWatermark() sliding_window = 
beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines = lines \
| 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >>
| beam.WindowInto(sliding_window, trigger=window_trigger,
| accumulation_mode=trigger.AccumulationMode.DISCARDING)

results = windowed_lines \
| 'groupbykey' >> beam.GroupByKey()\
| 'parse' >>
| beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
| 'invalid_rows', 'missing_recipients', main='main_valid')

output = results['main_valid'] \
        | 'format' >> beam.Map(output_format)\
        | 'write' >> beam.io.WriteToText(known_args.output, 
file_name_suffix=".gz")

--
Bests,
Andor

Reply via email to