Hello,

I'm trying to process a few terrabytes of mail logs without using too much 
memory, by windowing the bounded source into an unbounded one.
Still, the GroupByKey waits for all data to arrive. Can you give me hints how 
to work this around?

I have already searched, read available manuals and documentation, but I still 
don't have a clue.
Neither Direct, nor Google Dataflow runner works.

I'm using Python. Every item gets a timestamp, and then they are sorted to 
sliding windows, by the following code:

lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)

window_trigger = trigger.AfterWatermark() sliding_window = 
beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines = lines \
| 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >> 
| beam.WindowInto(sliding_window, trigger=window_trigger, 
| accumulation_mode=trigger.AccumulationMode.DISCARDING)

results = windowed_lines \
| 'groupbykey' >> beam.GroupByKey()\
| 'parse' >> 
| beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', 
| 'invalid_rows', 'missing_recipients', main='main_valid')

output = results['main_valid'] \
        | 'format' >> beam.Map(output_format)\
        | 'write' >> beam.io.WriteToText(known_args.output, 
file_name_suffix=".gz")

--
Bests,
Andor

Reply via email to