[ https://issues.apache.org/jira/browse/HUDI-1013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vinoth Chandar updated HUDI-1013: --------------------------------- Status: Patch Available (was: In Progress) > Bulk Insert w/o converting to RDD > --------------------------------- > > Key: HUDI-1013 > URL: https://issues.apache.org/jira/browse/HUDI-1013 > Project: Apache Hudi > Issue Type: Improvement > Components: Writer Core > Reporter: sivabalan narayanan > Assignee: sivabalan narayanan > Priority: Blocker > Labels: pull-request-available > Fix For: 0.6.0 > > > Our bulk insert(not just bulk insert, all operations infact) does dataset to > rdd conversion in HoodieSparkSqlWriter and our HoodieClient deals with > JavaRDD<HoodieRecord>s. We are trying to see if we can improve our > performance by avoiding the rdd conversion. We will first start off w/ bulk > insert and get end to end working before we decide if we wanna do this for > other operations too after doing some perf analysis. > > On a high level, this is the idea > 1. Dataset<Row> will be passed in all the way from spark sql writer to the > storage writer. We do not convert to HoodieRecord at any point in time. > 2. We need to use > [ParquetWriteSupport|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala]] > to write to Parquet as InternalRows. > 3. So, gist of what we wanna do is, with the Dataset<Rows>s, sort by > partition path and record keys, repartition by parallelism config, and do > mapPartitions. Within MapPartitions, we will iterate through the Rows, encode > to InternalRows and write to Parquet using the write support linked above. > We first wanted to check if our strategy will actually improve the perf. So, > I did a quick hack of just the mapPartition func in HoodieSparkSqlWriter just > to see how the numbers look like. Check for operation > "bulk_insert_direct_parquet_write_support" > [here|#diff-5317f4121df875e406876f9f0f012fac]]. > These are the numbers I got. (1) is existing hoodie bulk insert which does > the rdd conversion to JavaRdd<HoodieRecords>. (2) is writing directly to > parquet in spark. Code given below. (3) is the modified hoodie code i.e. > operation bulk_insert_direct_parquet_write_support) > > | |5M records 100 parallelism input size 2.5 GB| > |(1) Orig hoodie(unmodified)|169 secs. output size 2.7 GB| > |(2) Parquet |62 secs. output size 2.5 GB| > |(3) Modified hudi code. Direct Parquet Write |73 secs. output size 2.5 GB| > > So, essentially our existing code for bulk insert is > 2x that of parquet. > Our modified hudi code (i.e. operation > bulk_insert_direct_parquet_write_support) is close to direct Parquet write in > spark, which shows that our strategy should work. > // This is the Parquet write in spark. (2) above. > transformedDF.sort(*"partition"*, *"key"*) > .coalesce(parallelism) > .write.format(*"parquet"*) > .partitionBy(*"partition"*) > .mode(saveMode) > .save(*s"**$*outputPath*/**$*format*"*) > -- This message was sent by Atlassian Jira (v8.3.4#803005)