codejoyan edited a comment on issue #2592:
URL: https://github.com/apache/hudi/issues/2592#issuecomment-785121095


   The below succeeds. I removed the decimal fields only and was able to save
   
   ```
   scala> val inputDF = spark.read.format("csv").option("header", 
"true").load("hdfs://XXXXXhw00ha/user/joyan/test_data.csv")
   inputDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: string ... 
12 more fields]
   
   scala> val formattedDF = inputDF.selectExpr("col_1", "cast(col_2 as integer) 
col_2",
        | "cast(col_3 as short) col_3", "col_4", "col_5", "cast(col_6 as byte) 
col_6",
        | "cast(col_9 as timestamp) col_9", "col_10", "cast(col_11 as 
timestamp) col_11",
        | "col_12", "cntry_cd", "cast(bus_dt as date) bus_dt")
   formattedDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: int ... 
10 more fields]
   
   scala> val transformedDF = formattedDF.withColumn("partitionpath", 
concat(lit("cntry_cd="), col("cntry_cd"), lit("/bus_dt="), col("bus_dt")))
   transformedDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: int 
... 11 more fields]
   
   scala> transformedDF.show
   
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+
   |               col_1|col_2|col_3|col_4|col_5|col_6|               col_9|    
 col_10|              col_11|     col_12|cntry_cd|    bus_dt|       
partitionpath|
   
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+
   |7IN00716079317820...|  716|    3|   AB|  INR| null|2021-02-14 
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|      
IN|2021-02-01|cntry_cd=IN/bus_d...|
   |7IN00716079317820...|  716|    2|   AB|  INR| null|2021-02-14 
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|      
IN|2021-02-01|cntry_cd=IN/bus_d...|
   |7IN00716079317820...|  716|    1|   AB|  INR| null|2021-02-14 
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|      
IN|2021-02-01|cntry_cd=IN/bus_d...|
   |AU700716079381819...| 5700|    5|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      
IN|2021-02-02|cntry_cd=IN/bus_d...|
   |AU700716079381819...| 5700|    6|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      
IN|2021-02-02|cntry_cd=IN/bus_d...|
   |AU700716079381819...| 5700|    4|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      
AU|2021-02-01|cntry_cd=AU/bus_d...|
   |AU700716079381819...| 5700|    3|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      
AU|2021-02-01|cntry_cd=AU/bus_d...|
   |AU700716079381819...| 5700|    1|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      
AU|2021-02-01|cntry_cd=AU/bus_d...|
   
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+
   
   
   scala> transformedDF.write.format("org.apache.hudi").
        | options(getQuickstartWriteConfigs).
        | option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col_9").
        | option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, 
"col_2,col_1,col_3").
        | option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionpath").
        | option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.ComplexKeyGenerator").
        | option("hoodie.upsert.shuffle.parallelism","2").
        | option("hoodie.insert.shuffle.parallelism","2").
        | option(HoodieWriteConfig.TABLE_NAME, "targetTableHudi").
        | mode(SaveMode.Append).
        | save(targetPath)
                                                                                
   
   scala> formattedDF.printSchema
   root
    |-- col_1: string (nullable = true)
    |-- col_2: integer (nullable = true)
    |-- col_3: short (nullable = true)
    |-- col_4: string (nullable = true)
    |-- col_5: string (nullable = true)
    |-- col_6: byte (nullable = true)
    |-- col_9: timestamp (nullable = true)
    |-- col_10: string (nullable = true)
    |-- col_11: timestamp (nullable = true)
    |-- col_12: string (nullable = true)
    |-- cntry_cd: string (nullable = true)
    |-- bus_dt: date (nullable = true)
   
   
   scala> formattedDF.show
   
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+
   |               col_1|col_2|col_3|col_4|col_5|col_6|               col_9|    
 col_10|              col_11|     col_12|cntry_cd|    bus_dt|
   
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+
   |7IN00716079317820...|  716|    3|   AB|  INR| null|2021-02-14 
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|      IN|2021-02-01|
   |7IN00716079317820...|  716|    2|   AB|  INR| null|2021-02-14 
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|      IN|2021-02-01|
   |7IN00716079317820...|  716|    1|   AB|  INR| null|2021-02-14 
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|      IN|2021-02-01|
   |AU700716079381819...| 5700|    5|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      IN|2021-02-02|
   |AU700716079381819...| 5700|    6|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      IN|2021-02-02|
   |AU700716079381819...| 5700|    4|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      AU|2021-02-01|
   |AU700716079381819...| 5700|    3|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      AU|2021-02-01|
   |AU700716079381819...| 5700|    1|   AB|  INR| null|2021-02-14 
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|      AU|2021-02-01|
   
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+
   
   
   scala> val testSnapshotDF = spark.read.format("org.apache.hudi").
        | load(targetPath + "/*/*/*")
   21/02/24 14:36:27 WARN DefaultSource: Loading Base File Only View.
   testSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: 
string, _hoodie_commit_seqno: string ... 16 more fields]
   
   scala> testSnapshotDF.createOrReplaceTempView("test_hudi_ro")
   
   scala> spark.sql("select cntry_cd,bus_dt,count(1) from test_hudi_ro group by 
cntry_cd,bus_dt").show
   +--------+----------+--------+
   |cntry_cd|    bus_dt|count(1)|
   +--------+----------+--------+
   |      IN|2021-02-01|       3|
   |      AU|2021-02-01|       3|
   |      IN|2021-02-02|       2|
   +--------+----------+--------+
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to