Hi,
I have a pipeline continuously reading files from a folder where files can be created at a high throughput (with peaks around 100 files/sec), each file is a few Mb of avro data. The only purpose of my pipeline is to read those files as they come, deserialize them to get the GenericRecords and write them back in .parquet files at a different location. The simplified code looks as follow: pipeline .apply("Get patterns from Configuration", Create.of(getPaths())) .apply("Match All Continuously", FileIO.matchAll() every 5 seconds)))) .apply(FileIO.readMatches()) .apply(AvroIO.parseFilesGenericRecords()) .apply(window of 10 seconds) .apply(FileIO.writeDynamic() with ParquetIO.sink()) The issue is that all elements that come out of the FileIO.matchAll() will end up having the same timestamp if they were retrieved during the same polling, i.e. if the matchAll() matched 500 files, all records from these files will have the same timestamp and end up in the same window (and in some cases throw OOM exceptions). I have tried adding a ParDo to reassign the timestamp to local time after we parse the records but without any noticeable effect. I have also tried to add a "rate limiter" just after matching files and before reading them, but in this case records are stuck in the "groupByShards" operation of FileIO, I'm not sure but I think it might be because the watermark is still unchanged so it still waits for all files that were matched in the matchAll() operation. Any ideas on how to solve that ? Thanks, Jean Wisser. Jean Wisser Data Engineer [https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png] Flow Traders B.V. T: +31 20 799 6497 F: +31 20 799 6780 Jacob Bontiusplaats 9 1018 LL Amsterdam Nederland www.flowtraders.com<http://www.flowtraders.com> Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with the Trade Registry of the Chamber of Commerce under number 33.22.3268. This message may contain information that is not intended for you. If you are not the addressee or if this message was sent to you by mistake, you are requested to inform the sender and delete the message. This message may not be forwarded or published to any other person than its addressees without Flow Traders B.V.'s prior consent. Flow Traders B.V. accepts no liability for damage of any kind resulting from the risks inherent in the electronic transmission of messages.