tsreaper commented on pull request #17520:
URL: https://github.com/apache/flink/pull/17520#issuecomment-959002265


   @slinkydeveloper @fapaul @JingGe
   
   I've done some benchmarking on a testing yarn cluster.
   
   * Test data: The [Kaggle flight delay 
data](https://www.kaggle.com/usdot/flight-delays), a ~500MB csv file
   * Number of task slots: 8
   * Number of task manager: 1
   * Configuration
   ```yaml
   # common JVM configurations used in a lot of our production job, also for 
producing the TPCDS benchmark result in Flink 1.12
   env.java.opts.jobmanager: -XX:+UseParNewGC 
-XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 
-verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC 
-XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 
-XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
-XX:SurvivorRatio=5 -XX:ParallelGCThreads=4
   env.java.opts.taskmanager: -XX:+UseParNewGC 
-XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 
-verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC 
-XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 
-XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
-XX:SurvivorRatio=5 -XX:ParallelGCThreads=4
   
   # default memory configuration of Flink
   jobmanager.memory.process.size: 1600m
   taskmanager.memory.process.size: 1728m
   ```
   
   I've tested three implementations.
   * Bulk Format + Lazy Reading: This is the implementation of this PR.
   * Bulk Format + ArrayList: This implementation reads and deserialize all 
records of the whole block into an array list and send it to the reader thread. 
This implementation does not have a blocking pool as @JingGe suggested. See 
[here](https://github.com/tsreaper/flink/commit/3b86337cea499cd4245a34550a6b597239be3066)
 for code.
   * Stream Format: This is the implementation based on Stephan's 
[draft](https://github.com/apache/flink/commit/11c606096f6beeac45c4f4dabe0fde93cc91923d#diff-edfd2d187d920f781382054f22fb4e6e5b5d9361b95a87ebeda68ba3a49d5a55R51).
 See 
[here](https://github.com/tsreaper/flink/commit/6b3a65fd099fcffb4d7a5b20c9bde9aeace18f69)
 for code. I didn't implement projection pushdown for this but it should be 
fine because there is no projection pushdown in the benchmark.
   
   Here are the test results.
   
   ||xz compression, 64kb block size|xz compression, 2mb block size|
   |---|---|---|
   |bulk format + lazy reading|14s|10s|
   |bulk format + array list|14s|30s, due to GC|
   |stream format|2m24s, due to heavy GC|51s, due to GC|
   
   It is obvious that any implementation which loads all records of a block 
into memory at once will suffer from GC more or less. Also for smaller block 
sizes, blocking pool has almost no impact on performance. So I would say the 
implementation in this PR is the most suited implementation so far.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to