ahmed-elfar commented on issue #1498: Migrating parquet table to hudi issue 
[SUPPORT]
URL: https://github.com/apache/incubator-hudi/issues/1498#issuecomment-612650421
 
 
   @vinothchandar Thank you very much for your answer and help, and everyone 
participated to resolve the issue.
   
   We already have done further attempts with Hudi for bulk-insert, but I 
didn't want to mention that in the current thread, until we make sure that it 
is **either we are missing something or there is an initial high cost to 
migrate the base tables from parquet to Hudi**, because we need to know the 
pros, cons and limitation before adding Hudi to our product, we are talking 
about migrating hundreds of GB and/or TB of data from parquet to Hudi table.
   
   So I will share more details which might help, and answer to your 
suggestions and questions.
   
   First I will share information about the data:
   
   - For initial bench-marking we generate standard **tpch** data 1GB, 10GB, 
30GB, 100GB, 265GB and 1TB. The tables mentioned on previous discussion is 
**lineitem** generated by tpch 30GB and 100GB, which has original parquet size 
of 6.7GB and 21GB respectively. 
   
   - Schema for lineitem 
   ![Screenshot from 2020-04-12 
17-56-05](https://user-images.githubusercontent.com/20902425/79073448-f1d7de80-7ce6-11ea-80ea-f8a13ad960bc.png)
   
   
   - We have another version of lineitem with additional column last_updated 
with generated timestamp to define as PRECOMBINE_FIELD_OPT_KEY
   
   - Increments are 3% to 5% of the original base table count, 30% updates and 
70% inserts.
   
   - We have 2 versions of generated updates, one which touches only the last 
quarter of the year and another one generated randomly to touch most of the 
parquet parts during updates. 
   
   - Currently we generate no duplicates for the base table and increments.
   
   Based on the same test data we have done further attempts by modifying 
HoodieSparkSqlWriter itself, UserDefinedBulkInsertPartitioner, HoodieRecord, 
change the payload. I will share the test result for defining 
UserDefinedBulkInsertPartitioner because it looks similar as the approach you 
mentioned to avoid the sorting for bulk-insert:
   
   Applying bulk-insert for **lineitem**  generated from tpch 1GB 
   **Table**            : 213MB parquet, 6M records, 16 columns and key is 
composite of 2 columns. 
   **Spark Conf**  : 1 executor, 12 cores, 16GB, 32 shuffle, 32 
bulk-insert-parallelism.
   **Hudi Version**: % "hudi-spark-bundle" % "0.5.2-incubating"
   **Spark**           : 2.4.5 with hadoop 2.7
   **Table Type**   : COW
   
   -  Using vanilla % "hudi-spark-bundle" % "0.5.2-incubating", spark stages 
   
   ![Screenshot from 2020-04-12 
15-24-45](https://user-images.githubusercontent.com/20902425/79073948-bdb1ed00-7ce9-11ea-8dd0-e12e6075c065.png)
   
   As you can see the data has been shuffled to disk twice, applying the 
sorting twice as well.
   
   - Eagerly persist the input RDD before the bulk-insert, which uses the same 
sorting provided before the bulk-insert.
   
   `@Override
   public JavaRDD<HoodieRecord> repartitionRecords(JavaRDD records, int 
outputSparkPartitions)
   { JavaRDD<HoodieRecord> data = 
((JavaRDD<HoodieRecord>)records).sortBy(record -> {
           return String.format("%s+%s", record.getPartitionPath(), 
record.getRecordKey());
       }, true, outputSparkPartitions)
       .persist(StorageLevel.MEMORY_ONLY());
        data.foreach(record -> {});
        return data;
   }
   `
   
   ![Screenshot from 2020-04-12 
15-36-01](https://user-images.githubusercontent.com/20902425/79074237-8b08f400-7ceb-11ea-96e6-99bcacb5419f.png)
   
   So we avoided additional shuffle to disk, yet the performance still the same 
as well as we can't persist in memory for very large inputs.
   
   - Just return the RDD as it is, or (sort, partition the data based on the 
key columns) on the dataframe level before passing the dataframe to 
HudiSparkSqlWriter, then just return the data.
   
   `@Override
   public JavaRDD<HoodieRecord> repartitionRecords(JavaRDD records, int 
outputSparkPartitions) {
       return records;
   }`
   
   ![Screenshot from 2020-04-12 
15-37-56](https://user-images.githubusercontent.com/20902425/79074511-18007d00-7ced-11ea-90b4-40490fd78af6.png)
   
   **Note:** this approach impact the Upsert time significantly specially if 
you didn't apply any sorting to the data, it might be because the upsert 
operation touched most of the parquet parts.
   
   So please correct me if I am wrong, all the attached spark stages to the 
thread shows most of time spent (bottle neck) in the count action which filter 
the records with invalid write status, it is actually a narrow transformation 
after the sorting operation.
   We haven't checked the logic behind this action in details yet, I thought 
this might be the indexing operation and creating the metadata columns for Hudi 
table.
   
   
   Replaying to the questions and suggestion you mentioned:
   
   > If you are trying to ultimately migrate a table (using bulk_insert once) 
and then do updates/deletes. I suggest, testing upserts/deletes rather than 
bulk_insert.. If you primarily want to do bulk_insert alone to get other 
benefits of Hudi. Happy to work with you more and resolve this. Perf is a major 
push for the next release. So we can def collaborate here
   
   Yes we need to fully migrate the table to Hudi and apply excessive upserts / 
insert operations over it  later. 
   If I get your suggestion right, would you suggest to initially load the 
table using upsert or insert operation for the whole table instead of 
bulk-insert? 
   We have tried that and it was extremely slower, consuming more memory and 
disk than bulk-insert.
   We followed the documentation as well in the code which suggest to use the 
bulk-insert for initial table loading.
   
   > Optimizing the spark executor memory to avoid GC hits and using snappy 
compression
   
   We actually have tried many optimizations related to spark yet the 
performance improvements is limited or no gain. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to