[ 
https://issues.apache.org/jira/browse/HUDI-1013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-1013:
--------------------------------------
    Description: 
Our bulk insert(not just bulk insert, all operations infact) does dataset to 
rdd conversion in HoodieSparkSqlWriter and our HoodieClient deals with 
JavaRDD<HoodieRecord>s. We are trying to see if we can improve our performance 
by avoiding the rdd conversion.  We will first start off w/ bulk insert and get 
end to end working before we decide if we wanna do this for other operations 
too after doing some perf analysis. 

 

On a high level, this is the idea

1. Dataset<Row> will be passed in all the way from spark sql writer to the 
storage writer. We do not convert to HoodieRecord at any point in time. 

2. We need to use 
[ParquetWriteSupport|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala]]
 to write to Parquet as InternalRows.

3. So, gist of what we wanna do is, with the Dataset<Rows>s, sort by partition 
path and record keys, repartition by parallelism config, and do mapPartitions. 
Within MapPartitions, we will iterate through the Rows, encode to InternalRows 
and write to Parquet using the write support linked above. 

We first wanted to check if our strategy will actually improve the perf. So, I 
did a quick code up of just the mapPartition func in HoodieSparkSqlWriter just 
to see how the numbers look like. Check for operation 
"bulk_insert_direct_parquet_write_support" 
[here|[https://github.com/nsivabalan/hudi/commit/b70fe56d4c0648ffc2ed5111c910a7580af2ea63#diff-5317f4121df875e406876f9f0f012fac]].
 

These are the numbers I got. (1) is existing hoodie bulk insert. (2) is writing 
directly to parquet in spark. Code given below. (3) is the modified hoodie code 
i.e. operation bulk_insert_direct_parquet_write_support)

 
| |5M records 100 parallelism input size 2.5 GB|
|(1) Orig hoodie(unmodified)|169 secs. output size 2.7 GB|
|(2) Parquet |62 secs. output size 2.5 GB|
|(3) Modified hudi code. Direct Parquet Write |73 secs. output size 2.5 GB|

 

So, essentially our existing code for bulk insert is > 2x that of parquet. Our 
modified hudi code (i.e. operation bulk_insert_direct_parquet_write_support) is 
close to direct Parquet write in spark, which shows that our strategy should 
work. 

// This is the Parquet write in spark. (2) above. 

transformedDF.sort(*"partition"*, *"key"*)

.coalesce(parallelism)

 .write.format(*"parquet"*)

 .partitionBy(*"partition"*)

 .mode(saveMode)

 .save(*s"**$*outputPath*/**$*format*"*)

 

  was:
Our bulk insert(not just bulk insert, all operations infact) does dataset to 
rdd conversion in HoodieSparkSqlWriter and our HoodieClient deals with 
JavaRDD<HoodieRecord>s. We are trying to see if we can improve our performance 
by avoiding the rdd conversion.  We will first start off w/ bulk insert and get 
end to end working before we decide if we wanna do this for other operations 
too after doing some perf analysis. 

 

On a high level, this is the idea

1. Dataset<Row> will be passed in all the way from spark sql writer to the 
storage writer. We do not convert to HoodieRecord at any point in time. 

2. We need to use 
[ParquetWriteSupport|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala]]
 to write to Parquet as InternalRows.

3. So, gist of what we wanna do is, with the Dataset<Rows>s, sort by partition 
path and record keys, repartition by parallelism config, and do mapPartitions. 
Within MapPartitions, we will iterate through the Rows, encode to InternalRows 
and write to Parquet using the write support linked above. 

We first wanted to check if our strategy will actually improve the perf. So, I 
did a quick code up of just the mapPartition func in HoodieSparkSqlWriter just 
to see how the numbers look like. Check for operation 
"bulk_insert_direct_parquet_write_support" 
here([https://github.com/nsivabalan/hudi/commit/b70fe56d4c0648ffc2ed5111c910a7580af2ea63#diff-5317f4121df875e406876f9f0f012fac]).
 

These are the numbers I got. (1) is existing hoodie bulk insert. (2) is writing 
directly to parquet in spark. Code given below. (3) is the modified hoodie code 
i.e. operation bulk_insert_direct_parquet_write_support)

 
| |5M records 100 parallelism input size 2.5 GB|
|(1) Orig hoodie(unmodified)|169 secs. output size 2.7 GB|
|(2) Parquet |62 secs. output size 2.5 GB|
|(3) Modified hudi code. Direct Parquet Write |73 secs. output size 2.5 GB|

 

So, essentially our existing code for bulk insert is > 2x that of parquet. Our 
modified hudi code (i.e. operation bulk_insert_direct_parquet_write_support) is 
close to direct Parquet write in spark, which shows that our strategy should 
work. 

// This is the Parquet write in spark. (2) above. 

transformedDF.sort(*"partition"*, *"key"*)

.coalesce(parallelism)

 .write.format(*"parquet"*)

 .partitionBy(*"partition"*)

 .mode(saveMode)

 .save(*s"**$*outputPath*/**$*format*"*)

 


> Bulk Insert w/o converting to RDD
> ---------------------------------
>
>                 Key: HUDI-1013
>                 URL: https://issues.apache.org/jira/browse/HUDI-1013
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Writer Core
>            Reporter: sivabalan narayanan
>            Priority: Blocker
>             Fix For: 0.6.0
>
>
> Our bulk insert(not just bulk insert, all operations infact) does dataset to 
> rdd conversion in HoodieSparkSqlWriter and our HoodieClient deals with 
> JavaRDD<HoodieRecord>s. We are trying to see if we can improve our 
> performance by avoiding the rdd conversion.  We will first start off w/ bulk 
> insert and get end to end working before we decide if we wanna do this for 
> other operations too after doing some perf analysis. 
>  
> On a high level, this is the idea
> 1. Dataset<Row> will be passed in all the way from spark sql writer to the 
> storage writer. We do not convert to HoodieRecord at any point in time. 
> 2. We need to use 
> [ParquetWriteSupport|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala]]
>  to write to Parquet as InternalRows.
> 3. So, gist of what we wanna do is, with the Dataset<Rows>s, sort by 
> partition path and record keys, repartition by parallelism config, and do 
> mapPartitions. Within MapPartitions, we will iterate through the Rows, encode 
> to InternalRows and write to Parquet using the write support linked above. 
> We first wanted to check if our strategy will actually improve the perf. So, 
> I did a quick code up of just the mapPartition func in HoodieSparkSqlWriter 
> just to see how the numbers look like. Check for operation 
> "bulk_insert_direct_parquet_write_support" 
> [here|[https://github.com/nsivabalan/hudi/commit/b70fe56d4c0648ffc2ed5111c910a7580af2ea63#diff-5317f4121df875e406876f9f0f012fac]].
>  
> These are the numbers I got. (1) is existing hoodie bulk insert. (2) is 
> writing directly to parquet in spark. Code given below. (3) is the modified 
> hoodie code i.e. operation bulk_insert_direct_parquet_write_support)
>  
> | |5M records 100 parallelism input size 2.5 GB|
> |(1) Orig hoodie(unmodified)|169 secs. output size 2.7 GB|
> |(2) Parquet |62 secs. output size 2.5 GB|
> |(3) Modified hudi code. Direct Parquet Write |73 secs. output size 2.5 GB|
>  
> So, essentially our existing code for bulk insert is > 2x that of parquet. 
> Our modified hudi code (i.e. operation 
> bulk_insert_direct_parquet_write_support) is close to direct Parquet write in 
> spark, which shows that our strategy should work. 
> // This is the Parquet write in spark. (2) above. 
> transformedDF.sort(*"partition"*, *"key"*)
> .coalesce(parallelism)
>  .write.format(*"parquet"*)
>  .partitionBy(*"partition"*)
>  .mode(saveMode)
>  .save(*s"**$*outputPath*/**$*format*"*)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to