Yes, I did. The same happens as currently with windowing. GroupByKey waits for all data to arrive, and no output files appear until the end. Maybe it’s not a memory issue and Beam sorts this out with temporary files. But if there’s a problem in thousands of files and such amount of data, then I won’t be able to fix and resume, already preprocessed data in temporary files are lost. Also I cannot really see the progress.
Meanwhile I had to realize, that there’s something with the AddTimestampDoFn, because if on a small sample I collect the output of it into a list, then those are not TimestampedValue objects, therefore windowing also could not occur. I suppose that’s because the PCollection I’m reading from is not unbounded, therefore timestamp is thrown away. It may sound nonsense if you know Beam well, but that’s my best guess now. I have found a possible workaround, as to push file contents into PubSub, then read data from there with Beam/DataFlow. Dataflow even has a template for that called “GCS Text to Cloud PubSub”. Though, I can’t believe that there’s no simpler and more elegant way to solve this. -- Bests, Andor Feladó: Lukasz Cwik [mailto:lc...@google.com] Küldve: Friday, August 24, 2018 00:10 Címzett: user@beam.apache.org Tárgy: Re: Window bounded to unbounded PCollection It is unclear what the purpose of windowing is since windowing doesn't impact memory utilization much on Google Dataflow. The Direct runner uses memory proportional to the amount of data processed but the Google Dataflow runner does not. The Google Dataflow runner's memory usage is proportional to the size of your elements. You can reach out to Google Cloud Platform support with some job ids or additional questions if you want. Have you tried a pipeline like (without windowing): lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input) results = lines \ | 'groupbykey' >> beam.GroupByKey()\ | 'parse' >> | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', | 'invalid_rows', 'missing_recipients', main='main_valid') output = results['main_valid'] \ | 'format' >> beam.Map(output_format) \ | 'write' >> beam.io.WriteToText(known_args.output, file_name_suffix=".gz") On Thu, Aug 23, 2018 at 5:32 AM Tóth Andor <andor.t...@centralmediacsoport.hu<mailto:andor.t...@centralmediacsoport.hu>> wrote: Hello, I'm trying to process a few terrabytes of mail logs without using too much memory, by windowing the bounded source into an unbounded one. Still, the GroupByKey waits for all data to arrive. Can you give me hints how to work this around? I have already searched, read available manuals and documentation, but I still don't have a clue. Neither Direct, nor Google Dataflow runner works. I'm using Python. Every item gets a timestamp, and then they are sorted to sliding windows, by the following code: lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input) window_trigger = trigger.AfterWatermark() sliding_window = beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines = lines \ | 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >> | beam.WindowInto(sliding_window, trigger=window_trigger, | accumulation_mode=trigger.AccumulationMode.DISCARDING) results = windowed_lines \ | 'groupbykey' >> beam.GroupByKey()\ | 'parse' >> | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', | 'invalid_rows', 'missing_recipients', main='main_valid') output = results['main_valid'] \ | 'format' >> beam.Map(output_format)\ | 'write' >> beam.io.WriteToText(known_args.output, file_name_suffix=".gz") -- Bests, Andor