yihua commented on code in PR #9273: URL: https://github.com/apache/hudi/pull/9273#discussion_r1272478660
########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala: ########## @@ -941,6 +942,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testSparkPartitionByWithCustomKeyGenerator(recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOptsLessPartitionPath(recordType) + // Specify fieldType as TIMESTAMP of type EPOCHMILLISECONDS and output date format as yyyy/MM/dd + var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts) + writer.partitionBy("current_ts:TIMESTAMP") + .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS") + .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd") + .mode(SaveMode.Overwrite) + .save(basePath) + var recordsReadDF = spark.read.format("org.apache.hudi") + .options(readOpts) + .load(basePath) + val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyy/MM/dd"))) + + assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count()) + + // Mixed fieldType with TIMESTAMP of type EPOCHMILLISECONDS and output date format as yyyy/MM/dd + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts) + writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP") + .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS") + .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd") + .mode(SaveMode.Overwrite) + .save(basePath) + recordsReadDF = spark.read.format("org.apache.hudi") + .options(readOpts) + .load(basePath) + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= + concat(col("driver"), lit("/"), col("rider"), lit("/"), udf_date_format(col("current_ts")))).count() == 0) + } + + @Test + def testPartitionPruningForTimestampBasedKeyGenerator(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOptsLessPartitionPath(HoodieRecordType.AVRO, enableFileIndex = true) + val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName, writeOpts) + writer.partitionBy("current_ts") + .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS") + .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd") + .mode(SaveMode.Overwrite) + .save(basePath) + + val snapshotQueryRes = spark.read.format("hudi") + .options(readOpts) + .load(basePath) + .where("current_ts > '1970/01/16'") + assertTrue(checkPartitionFilters(snapshotQueryRes.queryExecution.executedPlan.toString, "current_ts.* > 1970/01/16")) Review Comment: +1 ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala: ########## @@ -342,6 +342,20 @@ object HoodieFileIndex extends Logging { if (listingModeOverride != null) { properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride) } + val tableConfig = metaClient.getTableConfig + val partitionColumns = tableConfig.getPartitionFields + if (partitionColumns.isPresent) { + val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName + // NOTE: A custom key generator with multiple fields could have non-encoded slashes in the partition columns' + // value. We might not be able to properly parse partition-values from the listed partition-paths. Fallback + // to eager listing in this case. + val isCustomKeyGenerator = (classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) + || classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) + val hasMultiplePartitionFields = partitionColumns.get().length > 1 + if (hasMultiplePartitionFields && isCustomKeyGenerator) { + properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, DataSourceReadOptions.FILE_INDEX_LISTING_MODE_EAGER) + } + } Review Comment: nit: only execute this part if `listingModeOverride` is lazy? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala: ########## @@ -941,6 +942,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testSparkPartitionByWithCustomKeyGenerator(recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOptsLessPartitionPath(recordType) + // Specify fieldType as TIMESTAMP of type EPOCHMILLISECONDS and output date format as yyyy/MM/dd + var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts) + writer.partitionBy("current_ts:TIMESTAMP") + .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS") + .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd") + .mode(SaveMode.Overwrite) + .save(basePath) + var recordsReadDF = spark.read.format("org.apache.hudi") + .options(readOpts) + .load(basePath) + val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyy/MM/dd"))) + + assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count()) + + // Mixed fieldType with TIMESTAMP of type EPOCHMILLISECONDS and output date format as yyyy/MM/dd + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts) + writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP") + .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS") + .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd") + .mode(SaveMode.Overwrite) + .save(basePath) + recordsReadDF = spark.read.format("org.apache.hudi") Review Comment: nit: use `.format("hudi")`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org