Hi there, 

I have the following (probably very common) usecase: I have some lookup data ( 
100 million records ) which change only slowly (in the range of some thousands 
per day). My event stream is in the order of tens of billions events per day 
and each event needs to be enriched from the 100 million lookup source. For the 
JOIN, I don't need any event time related stuff, just the newest version at the 
time of enrichment shall be taken into account. 

As a user used to the DataStream API but unfamiliar with SQL API, I built a 
small MVP. I used a connected stream and put the enrichment data into keyed 
(heap)state. My RAM is enough to hold all the data in memory (once in prod at 
least). I first streamed in all 100 million records, then I started the 
performance measurement by streaming in just 3 million events to be enriched 
against the 100 million records. I was a bit stunned that the enrichment of all 
events took about 40 seconds on my local machine. I built up a similar MVP in 
Spark where I put the 100 million records into a (pre-partioned to the JOIN 
column) hive table, the 3 million test events into a parquetfile and then run 
an outer join which also took about 40 seconds on my local machine (consuming 
only 16GB of RAM). I somehow expected Flink to be much faster as I hold the 
enrichment data already in memory (state) and at least on the localhost, there 
is no real networking involved. 

I then thought about the problems with the DataStream API: My 100 million 
events are read from an uncompressed CSV file which is 25GB in size. 
Deserialized to Java POJOs, I guess the POJOs would take 100GB heap space. 
[Actually, I run the tests in Spark with all 100million records and this Flink 
test with only 20 Million records due to too much memory used, so the 100GB is 
an estimation from 20 million records taking 20GB heap space]. When I stopped 
parsing my enrichment data to POJOs but extracted only the enrichment (join) 
attribute and kept the remaining part of the data as a simple string, the java 
heap taken was only about 25GB again for all 100million records. Not only that, 
my enrichment JOIN now took only 30 seconds to complete all records. My thought 
now is: I probably shouldn't use DataStream API with Java POJOs here, but Flink 
SQL API with "Row" classes? I remember I once read some blog with how Flink 
internally optimizes its data strucutres and can reuse certain stuff when using 
SQL API and so on. 

Before I am going to try out several variants now, my question is: What do you 
think is the fastest/most efficient way to enrich slowly changing data with the 
latest version (Processing time temporal table JOIN) [When memory isn't a big 
problem once deployed to the cluster]? Do you recommend to use the SQL API? 
With which type of JOIN? (Processing time temporal table?) and hold enrichment 
table fully in Flink managed memory (Can I express this via SQL API?) or do I 
need to use some external "LookupTableSource"? Once I run my application in the 
cluster, I suspect a "LookupTableSource" to introduce some communication 
overhead vs. querying Flink State directly? If you recommend DataStream API to 
be used: Should I read via SQL connectors and work with "Rows" in state? What 
kind of performance tunings should I take into account here (reuseObjects, 
disableChaining, ...)? 

Best regards 
Theo 

Reply via email to