Hi,

I have a pipeline continuously reading files from a folder where files can be 
created at a high throughput (with peaks around 100 files/sec), each file is a 
few Mb of avro data.

The only purpose of my pipeline is to read those files as they come, 
deserialize them to get the GenericRecords and write them back in .parquet 
files at a different location.


The simplified code looks as follow:


pipeline
.apply("Get patterns from Configuration", Create.of(getPaths()))
.apply("Match All Continuously", FileIO.matchAll() every 5 seconds))))
.apply(FileIO.readMatches())
.apply(AvroIO.parseFilesGenericRecords())
.apply(window of 10 seconds)

.apply(FileIO.writeDynamic() with ParquetIO.sink())


The issue is that all elements that come out of the FileIO.matchAll() will end 
up having the same timestamp if they were retrieved during the same polling, 
i.e. if the matchAll() matched 500 files, all records from these files will 
have the same timestamp and end up in the same window (and in some cases throw 
OOM exceptions).

I have tried adding a ParDo to reassign the timestamp to local time after we 
parse the records but without any noticeable effect.

I have also tried to add a "rate limiter" just after matching files and before 
reading them, but in this case records are stuck in the "groupByShards" 
operation of FileIO, I'm not sure but I think it might be because the watermark 
is still unchanged so it still waits for all files that were matched in the 
matchAll() operation.


Any ideas on how to solve that ?


Thanks,

Jean Wisser.




Jean Wisser
Data Engineer

[https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png]

Flow Traders B.V.

T: +31 20 799 6497
F: +31 20 799 6780

Jacob Bontiusplaats 9
1018 LL Amsterdam
Nederland
www.flowtraders.com<http://www.flowtraders.com>

Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office 
at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with 
the Trade Registry of the Chamber of Commerce under number 33.22.3268. This 
message may contain information that is not intended for you. If you are not 
the addressee or if this message was sent to you by mistake, you are requested 
to inform the sender and delete the message. This message may not be forwarded 
or published to any other person than its addressees without Flow Traders 
B.V.'s prior consent. Flow Traders B.V. accepts no liability for damage of any 
kind resulting from the risks inherent in the electronic transmission of 
messages.

Reply via email to