zuyanton opened a new issue #1829:
URL: https://github.com/apache/hudi/issues/1829


   Hudi MoR reading performance gets slower on tables with many (1000+) 
partitions stored in S3. When running simple ```spark.sql("select * from 
table_ro).count``` command, we observe in spark UI  that first 2.5 minutes no 
spark jobs gets scheduled and the load on cluster during that period is almost 
non existing.   
   ![select star 
ro](https://user-images.githubusercontent.com/67354813/87452475-1e391a80-c5b6-11ea-9f63-6e6aa877c20f.PNG)
 
    
   When looking into logs to figure out what is going on during that period we 
observe that first  two and a half minutes Hudi is busy running 
```HoodieParquetInputFormat.listStatus``` [code 
link](https://github.com/apache/hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java#L68).
 I placed timer logs lines around various parts of that function and was able 
to narrow down to this line 
https://github.com/apache/hudi/blob/f5dc8ca733014d15a6d7966a5b6ae4308868adfa/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java#L103
    this line execution takes over 2/3 of all time.
   If I understand correctly what this line does it lists all files in a single 
partition.   
   Looks like this "overhead" is linearly depends on number of partitions as 
increasing number of partitions to 2000 almost doubles the overhead and cluster 
just runs ```HoodieParquetInputFormat.listStatus``` before starting executing 
any spark jobs. 
   
   **To Reproduce**
   see code snippet bellow 
   
   * Hudi version : master branch
   
   * Spark version : 2.4.4
   
   * Hive version : 2.3.6
   
   * Hadoop version : 2.8.5
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   ```
       import org.apache.spark.sql.functions._
       import org.apache.hudi.hive.MultiPartKeysValueExtractor
       import org.apache.hudi.QuickstartUtils._
       import scala.collection.JavaConversions._
       import org.apache.spark.sql.SaveMode
       import org.apache.hudi.DataSourceReadOptions._
       import org.apache.hudi.DataSourceWriteOptions._
       import org.apache.hudi.DataSourceWriteOptions
       import org.apache.hudi.config.HoodieWriteConfig._
       import org.apache.hudi.config.HoodieWriteConfig
       import org.apache.hudi.keygen.ComplexKeyGenerator
       import org.apache.hadoop.hive.conf.HiveConf
       val hiveConf = new HiveConf()
       val hiveMetastoreURI = 
hiveConf.get("hive.metastore.uris").replaceAll("thrift://", "")
       val hiveServer2URI = hiveMetastoreURI.substring(0, 
hiveMetastoreURI.lastIndexOf(":"))
       var hudiOptions = Map[String,String](
         HoodieWriteConfig.TABLE_NAME → "testTable1",
         "hoodie.consistency.check.enabled"->"true",
         "hoodie.compact.inline.max.delta.commits"->"100",
         "hoodie.compact.inline"->"true",
         DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ",
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "pk",
         DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[ComplexKeyGenerator].getName,
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"partition",
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "sort_key",
         DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true",
         DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "testTable1",
         DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "partition",
         DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → 
classOf[MultiPartKeysValueExtractor].getName,
         DataSourceWriteOptions.HIVE_URL_OPT_KEY 
->s"jdbc:hive2://$hiveServer2URI:10000"
       )
   
       spark.sql("drop table if exists testTable1_ro")
       spark.sql("drop table if exists testTable1_rt")
       var seq = Seq((1, 2, 3))
       for (i<- 2 to 1000) {
         seq = seq :+ (i, i , 1)
       }
       var df = seq.toDF("pk", "partition", "sort_key")
       //create table
       
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/test/hudi/zuyanton/1/testTable1")
       //update table couple times
       
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/test/hudi/zuyanton/1/testTable1")
       
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/test/hudi/zuyanton/1/testTable1")
       
       //read table 
       spark.sql("select * from testTable_ro").count
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to