yihua commented on code in PR #9273:
URL: https://github.com/apache/hudi/pull/9273#discussion_r1272478660


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -941,6 +942,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
+  def testSparkPartitionByWithCustomKeyGenerator(recordType: 
HoodieRecordType): Unit = {
+    val (writeOpts, readOpts) = 
getWriterReaderOptsLessPartitionPath(recordType)
+    // Specify fieldType as TIMESTAMP of type EPOCHMILLISECONDS and output 
date format as yyyy/MM/dd
+    var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, 
writeOpts)
+    writer.partitionBy("current_ts:TIMESTAMP")
+      .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS")
+      .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    var recordsReadDF = spark.read.format("org.apache.hudi")
+      .options(readOpts)
+      .load(basePath)
+    val udf_date_format = udf((data: Long) => new 
DateTime(data).toString(DateTimeFormat.forPattern("yyyy/MM/dd")))
+
+    assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= 
udf_date_format(col("current_ts"))).count())
+
+    // Mixed fieldType with TIMESTAMP of type EPOCHMILLISECONDS and output 
date format as yyyy/MM/dd
+    writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts)
+    writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP")
+      .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS")
+      .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    recordsReadDF = spark.read.format("org.apache.hudi")
+      .options(readOpts)
+      .load(basePath)
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
+      concat(col("driver"), lit("/"), col("rider"), lit("/"), 
udf_date_format(col("current_ts")))).count() == 0)
+  }
+
+  @Test
+  def testPartitionPruningForTimestampBasedKeyGenerator(): Unit = {
+    val (writeOpts, readOpts) = 
getWriterReaderOptsLessPartitionPath(HoodieRecordType.AVRO, enableFileIndex = 
true)
+    val writer = 
getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName, writeOpts)
+    writer.partitionBy("current_ts")
+      .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS")
+      .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val snapshotQueryRes = spark.read.format("hudi")
+      .options(readOpts)
+      .load(basePath)
+      .where("current_ts > '1970/01/16'")
+    
assertTrue(checkPartitionFilters(snapshotQueryRes.queryExecution.executedPlan.toString,
 "current_ts.* > 1970/01/16"))

Review Comment:
   +1



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -342,6 +342,20 @@ object HoodieFileIndex extends Logging {
     if (listingModeOverride != null) {
       
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
     }
+    val tableConfig = metaClient.getTableConfig
+    val partitionColumns = tableConfig.getPartitionFields
+    if (partitionColumns.isPresent) {
+      val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
+      // NOTE: A custom key generator with multiple fields could have 
non-encoded slashes in the partition columns'
+      //       value. We might not be able to properly parse partition-values 
from the listed partition-paths. Fallback
+      //       to eager listing in this case.
+      val isCustomKeyGenerator = 
(classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
+        || 
classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName))
+      val hasMultiplePartitionFields = partitionColumns.get().length > 1
+      if (hasMultiplePartitionFields && isCustomKeyGenerator) {
+        
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 DataSourceReadOptions.FILE_INDEX_LISTING_MODE_EAGER)
+      }
+    }

Review Comment:
   nit: only execute this part if `listingModeOverride` is lazy?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -941,6 +942,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
+  def testSparkPartitionByWithCustomKeyGenerator(recordType: 
HoodieRecordType): Unit = {
+    val (writeOpts, readOpts) = 
getWriterReaderOptsLessPartitionPath(recordType)
+    // Specify fieldType as TIMESTAMP of type EPOCHMILLISECONDS and output 
date format as yyyy/MM/dd
+    var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, 
writeOpts)
+    writer.partitionBy("current_ts:TIMESTAMP")
+      .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS")
+      .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    var recordsReadDF = spark.read.format("org.apache.hudi")
+      .options(readOpts)
+      .load(basePath)
+    val udf_date_format = udf((data: Long) => new 
DateTime(data).toString(DateTimeFormat.forPattern("yyyy/MM/dd")))
+
+    assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= 
udf_date_format(col("current_ts"))).count())
+
+    // Mixed fieldType with TIMESTAMP of type EPOCHMILLISECONDS and output 
date format as yyyy/MM/dd
+    writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts)
+    writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP")
+      .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS")
+      .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    recordsReadDF = spark.read.format("org.apache.hudi")

Review Comment:
   nit: use `.format("hudi")`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to