codejoyan commented on issue #2592: URL: https://github.com/apache/hudi/issues/2592#issuecomment-785121095
The below succeeds. I removed the decimal fields only and was able to save ``` scala> val inputDF = spark.read.format("csv").option("header", "true").load("hdfs://finstrnhw00ha/user/j0s0j7j/test_data.csv") inputDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: string ... 12 more fields] scala> val formattedDF = inputDF.selectExpr("col_1", "cast(col_2 as integer) col_2", | "cast(col_3 as short) col_3", "col_4", "col_5", "cast(col_6 as byte) col_6", | "cast(col_9 as timestamp) col_9", "col_10", "cast(col_11 as timestamp) col_11", | "col_12", "cntry_cd", "cast(bus_dt as date) bus_dt") formattedDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: int ... 10 more fields] scala> val transformedDF = formattedDF.withColumn("partitionpath", concat(lit("cntry_cd="), col("cntry_cd"), lit("/bus_dt="), col("bus_dt"))) transformedDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: int ... 11 more fields] scala> transformedDF.show +--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+ | col_1|col_2|col_3|col_4|col_5|col_6| col_9| col_10| col_11| col_12|cntry_cd| bus_dt| partitionpath| +--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+ |7IN00716079317820...| 716| 3| AB| INR| null|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01|cntry_cd=IN/bus_d...| |7IN00716079317820...| 716| 2| AB| INR| null|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01|cntry_cd=IN/bus_d...| |7IN00716079317820...| 716| 1| AB| INR| null|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01|cntry_cd=IN/bus_d...| |AU700716079381819...| 5700| 5| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| IN|2021-02-02|cntry_cd=IN/bus_d...| |AU700716079381819...| 5700| 6| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| IN|2021-02-02|cntry_cd=IN/bus_d...| |AU700716079381819...| 5700| 4| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01|cntry_cd=AU/bus_d...| |AU700716079381819...| 5700| 3| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01|cntry_cd=AU/bus_d...| |AU700716079381819...| 5700| 1| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01|cntry_cd=AU/bus_d...| +--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+ scala> transformedDF.write.format("org.apache.hudi"). | options(getQuickstartWriteConfigs). | option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col_9"). | option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "col_2,col_1,col_3"). | option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator"). | option("hoodie.upsert.shuffle.parallelism","2"). | option("hoodie.insert.shuffle.parallelism","2"). | option(HoodieWriteConfig.TABLE_NAME, "targetTableHudi"). | mode(SaveMode.Append). | save(targetPath) scala> formattedDF.printSchema root |-- col_1: string (nullable = true) |-- col_2: integer (nullable = true) |-- col_3: short (nullable = true) |-- col_4: string (nullable = true) |-- col_5: string (nullable = true) |-- col_6: byte (nullable = true) |-- col_9: timestamp (nullable = true) |-- col_10: string (nullable = true) |-- col_11: timestamp (nullable = true) |-- col_12: string (nullable = true) |-- cntry_cd: string (nullable = true) |-- bus_dt: date (nullable = true) scala> formattedDF.show +--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+ | col_1|col_2|col_3|col_4|col_5|col_6| col_9| col_10| col_11| col_12|cntry_cd| bus_dt| +--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+ |7IN00716079317820...| 716| 3| AB| INR| null|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01| |7IN00716079317820...| 716| 2| AB| INR| null|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01| |7IN00716079317820...| 716| 1| AB| INR| null|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01| |AU700716079381819...| 5700| 5| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| IN|2021-02-02| |AU700716079381819...| 5700| 6| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| IN|2021-02-02| |AU700716079381819...| 5700| 4| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01| |AU700716079381819...| 5700| 3| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01| |AU700716079381819...| 5700| 1| AB| INR| null|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01| +--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+ scala> val testSnapshotDF = spark.read.format("org.apache.hudi"). | load(targetPath + "/*/*/*") 21/02/24 14:36:27 WARN DefaultSource: Loading Base File Only View. testSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 16 more fields] scala> testSnapshotDF.createOrReplaceTempView("test_hudi_ro") scala> spark.sql("select cntry_cd,bus_dt,count(1) from test_hudi_ro group by cntry_cd,bus_dt").show +--------+----------+--------+ |cntry_cd| bus_dt|count(1)| +--------+----------+--------+ | IN|2021-02-01| 3| | AU|2021-02-01| 3| | IN|2021-02-02| 2| +--------+----------+--------+ ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org