codejoyan commented on issue #2620: URL: https://github.com/apache/hudi/issues/2620#issuecomment-884898614
**Problem Statement:** I am using COW table and receiving roughly 1GB of incremental data. The batch has data quality check and upsert. Attached is the spark UI stages screenshot: <img width="1512" alt="Screenshot 2021-07-22 at 6 11 33 PM" src="https://user-images.githubusercontent.com/48707638/126643808-dadfce60-1ee2-4c3d-9620-d760af0cd802.png"> - The record key is complex. It is a composite key of (a, b, c) (string, number, number). There is no timestamp ordering, but we can order by the numbers? - Yes the dataset is partitioned. It is regular partition. Tried both regular bloom and regular simple. With simple partition, performance is better. - Upsert parallelism is default 1500. Operation is upsert. - Upserts with inserts into the new partition and updates predominantly into latest partitions. But few updates touch many old partitions too. **SnapShot Count before the Upsert** Below is the snapshot view before running the upsert. ``` scala> val svsSnapshotDF = spark.read.format("org.apache.hudi"). | load(targetPath + "/*/*/*") scala> svsSnapshotDF.groupBy("v_date").count().sort(col("v_date")).show(false) 21/07/22 11:53:55 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. +----------+---------+ |visit_date|count | +----------+---------+ |2021-07-02|266836321| |2021-07-03|270866302| |2021-07-04|198333856| |2021-07-05|212205824| |2021-07-06|198391165| |2021-07-07|188043723| |2021-07-08|445 | +----------+---------+ ``` **Incremental Count after the Upsert:** ``` scala> storeVisitScanSnapshotDF.select(col("_hoodie_commit_time")).distinct.sort($"_hoodie_commit_time".desc).show(false) +-------------------+ |_hoodie_commit_time| +-------------------+ |20210721051130 | |20210721045241 | |20210721043446 | |20210720185844 | |20210720113928 | |20210720110235 | |20210720093310 | |20210720073405 | |20210720055244 | |20210720051405 | |20210720041607 | |20210719181512 | |20210719150715 | |20210719140407 | |20210719134750 | |20210719133012 | |20210719131145 | |20210719063351 | |20210719061724 | +-------------------+ scala> import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._ scala> val beginTime = "20210721051130" beginTime: String = 20210721051130 scala> val storeVisitScanIncrementalDF = spark.read.format("hudi"). | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | load(targetPath) storeVisitScanIncrementalDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 125 more fields] scala> storeVisitScanIncrementalDF.groupBy("visit_date").count().sort(col("visit_date")).show(false) +----------+-------+ |visit_date|count | +----------+-------+ |2021-07-07|2680595| |2021-07-08|25260 | +----------+-------+ ``` Completed Jobs <img width="1512" alt="Screenshot 2021-07-22 at 6 11 33 PM" src="https://user-images.githubusercontent.com/48707638/126644087-bc3d2500-df5f-4a38-b27f-52454faa75b6.png"> Details for the Slow Jobs <img width="1524" alt="Screenshot 2021-07-22 at 6 38 10 PM" src="https://user-images.githubusercontent.com/48707638/126644052-80d164af-b9d1-430b-9c9b-179596f58da2.png"> <img width="1515" alt="Screenshot 2021-07-22 at 6 37 52 PM" src="https://user-images.githubusercontent.com/48707638/126644059-64286adf-637a-455c-a611-1bd31878538d.png"> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org