[ https://issues.apache.org/jira/browse/HUDI-2287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vinoth Chandar updated HUDI-2287: --------------------------------- Priority: Blocker (was: Major) > Partition pruning not working on Hudi dataset > --------------------------------------------- > > Key: HUDI-2287 > URL: https://issues.apache.org/jira/browse/HUDI-2287 > Project: Apache Hudi > Issue Type: Sub-task > Components: Performance > Reporter: Rajkumar Gunasekaran > Assignee: Raymond Xu > Priority: Blocker > Fix For: 0.10.0 > > > Hi, we have created a Hudi dataset which has two level partition like this > {code:java} > s3://somes3bucket/partition1=value/partition2=value > {code} > where _partition1_ and _partition2_ is of type string > When running a simple count query using Hudi format in spark-shell, it takes > almost 3 minutes to complete > > {code:scala} > spark.read.format("hudi").load("s3://somes3bucket"). > where("partition1 = 'somevalue' and partition2 = 'somevalue'"). > count() > > res1: Long = #### > attempt 1: 3.2 minutes > attempt 2: 2.5 minutes > {code} > In the Spark UI ~9000 tasks (which is approximately equivalent to the total > no of files in the ENTIRE dataset s3://somes3bucket) are used for > computation. Seems like spark is reading the entire dataset instead of > *partition pruning.*...and then filtering the dataset based on the where > clause > Whereas, if I use the parquet format to read the dataset, the query only > takes ~30 seconds (vis-a-vis 3 minutes with Hudi format) > {code:scala} > spark.read.parquet("s3://somes3bucket"). > where("partition1 = 'somevalue' and partition2 = 'somevalue'"). > count() > res2: Long = #### > ~ 30 seconds > {code} > In the spark UI, only 1361 (ie 1361 tasks) files are scanned (vis-a-vis ~9000 > files in Hudi) and takes only 15 seconds > Any idea why partition pruning is not working when using Hudi format? > Wondering if I am missing any configuration during the creation of the > dataset? > PS: I ran this query in emr-6.3.0 which has Hudi version 0.7.0 and here is > the configuration I have used for creating the dataset > {code:scala} > df.writeStream > .trigger(Trigger.ProcessingTime(s"${param.triggerTimeInSeconds} seconds")) > .partitionBy("partition1","partition2") > .format("org.apache.hudi") > .option(HoodieWriteConfig.TABLE_NAME, param.hiveNHudiTableName.get) > //-- > .option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC, "snappy") > .option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, > param.expectedFileSizeInBytes) > .option(HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES, > HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES) > //-- > .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, > (param.expectedFileSizeInBytes / 100) * 80) > .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true") > .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, > param.runCompactionAfterNDeltaCommits.get) > //-- > .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, > DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "record_key_id") > .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, > classOf[CustomKeyGenerator].getName) > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, > "partition1:SIMPLE,partition2:SIMPLE") > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, > hudiTablePrecombineKey) > .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") > //.option(DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY, "false") > .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true") > .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, > "partition1,partition2") > .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, param.hiveDb.get) > .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, > param.hiveNHudiTableName.get) > .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, > classOf[MultiPartKeysValueExtractor].getName) > .outputMode(OutputMode.Append()) > .queryName(s"${param.hiveDb}_${param.hiveNHudiTableName}_query"){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)