tsreaper commented on pull request #17520: URL: https://github.com/apache/flink/pull/17520#issuecomment-959002265
@slinkydeveloper @fapaul @JingGe I've done some benchmarking on a testing yarn cluster. * Test data: The [Kaggle flight delay data](https://www.kaggle.com/usdot/flight-delays), a ~500MB csv file * Number of task slots: 8 * Number of task manager: 1 * Configuration ```yaml # common JVM configurations used in a lot of our production job, also for producing the TPCDS benchmark result in Flink 1.12 env.java.opts.jobmanager: -XX:+UseParNewGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 -XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:SurvivorRatio=5 -XX:ParallelGCThreads=4 env.java.opts.taskmanager: -XX:+UseParNewGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 -XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:SurvivorRatio=5 -XX:ParallelGCThreads=4 # default memory configuration of Flink jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m ``` I've tested three implementations. * Bulk Format + Lazy Reading: This is the implementation of this PR. * Bulk Format + ArrayList: This implementation reads and deserialize all records of the whole block into an array list and send it to the reader thread. This implementation does not have a blocking pool as @JingGe suggested. See [here](https://github.com/tsreaper/flink/commit/3b86337cea499cd4245a34550a6b597239be3066) for code. * Stream Format: This is the implementation based on Stephan's [draft](https://github.com/apache/flink/commit/11c606096f6beeac45c4f4dabe0fde93cc91923d#diff-edfd2d187d920f781382054f22fb4e6e5b5d9361b95a87ebeda68ba3a49d5a55R51). See [here](https://github.com/tsreaper/flink/commit/6b3a65fd099fcffb4d7a5b20c9bde9aeace18f69) for code. I didn't implement projection pushdown for this but it should be fine because there is no projection pushdown in the benchmark. Here are the test results. ||xz compression, 64kb block size|xz compression, 2mb block size| |---|---|---| |bulk format + lazy reading|14s|10s| |bulk format + array list|14s|30s, due to GC| |stream format|2m24s, due to heavy GC|51s, due to GC| It is obvious that any implementation which loads all records of a block into memory at once will suffer from GC more or less. Also for smaller block sizes, blocking pool has almost no impact on performance. So I would say the implementation in this PR is the most suited implementation so far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
