nsivabalan commented on a change in pull request #2431: URL: https://github.com/apache/hudi/pull/2431#discussion_r570132309
########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala ########## @@ -348,4 +352,141 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + + private def getDataFrameWriter(keyGenerator: String): DataFrameWriter[Row] = { + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator) + .mode(SaveMode.Overwrite) + } + + @Test def testTranslateSparkParamsToHudiParamsWithCustomKeyGenerator(): Unit = { Review comment: may be we could name the test as "testSparkPartitonByWithCustomKeyGen()" . if this looks ok, you can fix all methods. succinct and conveys the meaning too. ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala ########## @@ -348,4 +352,141 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + + private def getDataFrameWriter(keyGenerator: String): DataFrameWriter[Row] = { + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator) + .mode(SaveMode.Overwrite) + } + + @Test def testTranslateSparkParamsToHudiParamsWithCustomKeyGenerator(): Unit = { + // Without fieldType, the default is SIMPLE + var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("current_ts") + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) + + // Specify fieldType as TIMESTAMP + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("current_ts:TIMESTAMP") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) + + // Mixed fieldType + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*") + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= + concat(col("driver"), lit("/"), col("rider"), lit("/"), udf_date_format(col("current_ts")))).count() == 0) + + // Test invalid partitionKeyType + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer = writer.partitionBy("current_ts:DUMMY") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + try { + writer.save(basePath) + fail("should fail when invalid PartitionKeyType is provided!") + } catch { + case e: Exception => + assertTrue(e.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")) + } + } + + @Test def testTranslateSparkParamsToHudiParamsWithSimpleKeyGenerator() { + // Use the `driver` field as the partition key + var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) + writer.partitionBy("driver") + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) + + // Use the `driver,rider` field as the partition key, If no such field exists, the default value `default` is used + writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) + writer.partitionBy("driver", "rider") Review comment: sorry, may I know how is this simple? we have two fields right? isn't this complex key gen ? ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala ########## @@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + + @Test def testPartitionByTranslateToPartitionPath() { + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + val noPartitionPathOpts = commonOpts - DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY + + // partitionBy takes effect + inputDF.write.format("hudi") + .partitionBy("current_date") + .options(noPartitionPathOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_date").cast("string")).count() == 0) + + // PARTITIONPATH_FIELD_OPT_KEY takes effect + inputDF.write.format("hudi") + .partitionBy("current_date") + .options(noPartitionPathOpts) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp") + .mode(SaveMode.Overwrite) + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("timestamp").cast("string")).count() == 0) + + // CustomKeyGenerator with SIMPLE + inputDF.write.format("hudi") + .partitionBy("current_ts") + .options(noPartitionPathOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.CustomKeyGenerator") + .mode(SaveMode.Overwrite) + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) + + // CustomKeyGenerator with TIMESTAMP + inputDF.write.format("hudi") + .partitionBy("current_ts") + .options(noPartitionPathOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.CustomKeyGenerator") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .mode(SaveMode.Overwrite) + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) Review comment: this looks good, thanks :) ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala ########## @@ -126,12 +134,13 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], df: DataFrame): BaseRelation = { val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams) + val translatedOptOptions = DataSourceWriteOptions.translateSqlOptions(parameters) Review comment: can we name this as translatedOptions ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala ########## @@ -192,6 +194,39 @@ object DataSourceWriteOptions { } } + /** + * Translate spark parameters to hudi parameters + * + * @param optParams Parameters to be translated + * @return Parameters after translation + */ + def translateSqlOptions(optParams: Map[String, String]): Map[String, String] = { + var translatedOptParams = optParams + // translate the api partitionBy of spark DataFrameWriter to PARTITIONPATH_FIELD_OPT_KEY + if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)) { + val partitionColumns = optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY) + .map(SparkDataSourceUtils.decodePartitioningColumns) + .getOrElse(Nil) + val keyGeneratorClass = optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, + DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) + + val partitionPathField = Review comment: can you please add a comment here that only customKeyGen needs special handling. also, add one simple example. ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala ########## @@ -348,4 +352,141 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + + private def getDataFrameWriter(keyGenerator: String): DataFrameWriter[Row] = { + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator) + .mode(SaveMode.Overwrite) + } + + @Test def testTranslateSparkParamsToHudiParamsWithCustomKeyGenerator(): Unit = { + // Without fieldType, the default is SIMPLE + var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("current_ts") + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) + + // Specify fieldType as TIMESTAMP + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("current_ts:TIMESTAMP") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) + + // Mixed fieldType + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*") + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= + concat(col("driver"), lit("/"), col("rider"), lit("/"), udf_date_format(col("current_ts")))).count() == 0) + + // Test invalid partitionKeyType + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer = writer.partitionBy("current_ts:DUMMY") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + try { + writer.save(basePath) + fail("should fail when invalid PartitionKeyType is provided!") + } catch { + case e: Exception => + assertTrue(e.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")) + } + } + + @Test def testTranslateSparkParamsToHudiParamsWithSimpleKeyGenerator() { + // Use the `driver` field as the partition key + var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) + writer.partitionBy("driver") + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) + + // Use the `driver,rider` field as the partition key, If no such field exists, the default value `default` is used + writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) + writer.partitionBy("driver", "rider") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0) + } + + @Test def testTranslateSparkParamsToHudiParamsWithComplexKeyGenerator() { + // Use the `driver` field as the partition key + var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) + writer.partitionBy("driver") + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) + + // Use the `driver`,`rider` field as the partition key + writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) + writer.partitionBy("driver", "rider") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0) + } + + @Test def testTranslateSparkParamsToHudiParamsWithTimestampBasedKeyGenerator() { + val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName) Review comment: I see we have lot of diff options w/ timestampbased. Can you create a follow up ticket. even if not for you, someone will pick it up and add more tests. ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala ########## @@ -348,4 +352,141 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + + private def getDataFrameWriter(keyGenerator: String): DataFrameWriter[Row] = { + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator) + .mode(SaveMode.Overwrite) + } + + @Test def testTranslateSparkParamsToHudiParamsWithCustomKeyGenerator(): Unit = { + // Without fieldType, the default is SIMPLE + var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("current_ts") + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) + + // Specify fieldType as TIMESTAMP + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("current_ts:TIMESTAMP") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) + + // Mixed fieldType + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*") + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= + concat(col("driver"), lit("/"), col("rider"), lit("/"), udf_date_format(col("current_ts")))).count() == 0) + + // Test invalid partitionKeyType + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) + writer = writer.partitionBy("current_ts:DUMMY") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + try { + writer.save(basePath) + fail("should fail when invalid PartitionKeyType is provided!") + } catch { + case e: Exception => + assertTrue(e.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")) + } + } + + @Test def testTranslateSparkParamsToHudiParamsWithSimpleKeyGenerator() { + // Use the `driver` field as the partition key + var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) + writer.partitionBy("driver") + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) + + // Use the `driver,rider` field as the partition key, If no such field exists, the default value `default` is used + writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) + writer.partitionBy("driver", "rider") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0) + } + + @Test def testTranslateSparkParamsToHudiParamsWithComplexKeyGenerator() { + // Use the `driver` field as the partition key + var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) + writer.partitionBy("driver") + .save(basePath) + + var recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) + + // Use the `driver`,`rider` field as the partition key + writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) + writer.partitionBy("driver", "rider") + .save(basePath) + + recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0) + } + + @Test def testTranslateSparkParamsToHudiParamsWithTimestampBasedKeyGenerator() { + val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName) + writer.partitionBy("current_ts") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .save(basePath) + + val recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) + } + + @Test def testTranslateSparkParamsToHudiParamsWithGlobalDeleteKeyGenerator() { + val writer = getDataFrameWriter(classOf[GlobalDeleteKeyGenerator].getName) + writer.partitionBy("driver") + .save(basePath) + + val recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*") + assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) + } + + @Test def testTranslateSparkParamsToHudiParamsWithNonpartitionedKeyGenerator() { + val writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName) + writer.partitionBy("driver") Review comment: just for my understanding. Can you help me understand, with NonpartitionedKeyGenerator what happens w/ the following a. writer.partitionBy("") b. writer.partitionBy("non_existant_column") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org