codejoyan edited a comment on issue #2592: URL: https://github.com/apache/hudi/issues/2592#issuecomment-785036461
Sure @bvaradar . Below steps will help to reproduce. **Observation:** This is failing when the dataframe has specific datatypes decimal with precision. If it is other datatypes it succeeds **Environment Details:** Spark version in HDP - 2.3.0.2.6.5.279-2 Hudi: hudi-spark-bundle_2.11:0.7.0 **Step-1:** Create dummy data ``` $ hdfs dfs -cat hdfs://XXXXXw00ha/user/joyan/test_data.csv col_1,col_2,col_3,col_4,col_5,col_6,col_7,col_8,col_9,col_10,col_11,col_12,cntry_cd,bus_dt 7IN00716079317820210109153408,716,3,AB,INR,107667253,0,1,2021-02-14 20:23:54.753,useridjsb91,2021-02-14 20:23:54.753,useridjsb91,IN,2021-02-01 7IN00716079317820210109153408,716,2,AB,INR,212733302,0,1,2021-02-14 20:23:54.753,useridjsb91,2021-02-14 20:23:54.753,useridjsb91,IN,2021-02-01 7IN00716079317820210109153408,716,1,AB,INR,139224013,0,1,2021-02-14 20:23:54.753,useridjsb91,2021-02-14 20:23:54.753,useridjsb91,IN,2021-02-01 AU700716079381819643325112243,5700,5,AB,INR,136158881,0,1,2021-02-14 20:06:38.718,useridjsb91,2021-02-14 20:06:38.718,useridjsb91,IN,2021-02-02 AU700716079381819643325112243,5700,6,AB,INR,585710940,4,1.97,2021-02-14 20:06:38.718,useridjsb91,2021-02-14 20:06:38.718,useridjsb91,IN,2021-02-02 AU700716079381819643325112243,5700,4,AB,INR,136158881,0,1,2021-02-14 20:06:38.718,useridjsb91,2021-02-14 20:06:38.718,useridjsb91,AU,2021-02-01 AU700716079381819643325112243,5700,3,AB,INR,136158881,0,1,2021-02-14 20:06:38.718,useridjsb91,2021-02-14 20:06:38.718,useridjsb91,AU,2021-02-01 AU700716079381819643325112243,5700,1,AB,INR,136158881,0,1,2021-02-14 20:06:38.718,useridjsb91,2021-02-14 20:06:38.718,useridjsb91,AU,2021-02-01 ``` **Step-2:** Try to read the data from file, cast to appropriate datatypes and save as Hudi table ``` $ spark-shell \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.7.0,org.apache.spark:spark-avro_2.11:2.4.4,org.apache.avro:avro:1.8.2 \ --conf spark.driver.extraClassPath=/path/org.apache.avro_avro-1.8.2.jar \ --conf spark.executor.extraClassPath=/path/org.apache.avro_avro-1.8.2.jar \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs import org.apache.hudi.config.{HoodieCompactionConfig, HoodieStorageConfig, HoodieWriteConfig} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.{col, concat, lit} val inputDF = spark.read.format("csv").option("header", "true").load("hdfs://XXXXXw00ha/user/joyan/test_data.csv") val formattedDF = inputDF.selectExpr("col_1", "cast(col_2 as integer) col_2", "cast(col_3 as short) col_3", "col_4", "col_5", "cast(col_6 as byte) col_6", "cast(col_7 as decimal(9,2)) col_7", "cast(col_8 as decimal(9,2)) col_8", "cast(col_9 as timestamp) col_9", "col_10", "cast(col_11 as timestamp) col_11", "col_12", "cntry_cd", "cast(bus_dt as date) bus_dt") formattedDF.printSchema root |-- col_1: string (nullable = true) |-- col_2: integer (nullable = true) |-- col_3: short (nullable = true) |-- col_4: string (nullable = true) |-- col_5: string (nullable = true) |-- col_6: byte (nullable = true) |-- col_7: decimal(9,2) (nullable = true) |-- col_8: decimal(9,2) (nullable = true) |-- col_9: timestamp (nullable = true) |-- col_10: string (nullable = true) |-- col_11: timestamp (nullable = true) |-- col_12: string (nullable = true) |-- cntry_cd: string (nullable = true) |-- bus_dt: date (nullable = true) formattedDF.show +--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+ | col_1|col_2|col_3|col_4|col_5|col_6|col_7|col_8|col_9| col_10|col_11| col_12|cntry_cd|bus_dt| +--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+ |7IN00716079317820...| 716| 3| AB| INR| null| 0.00| 1.00| null|useridjsb91| null|useridjsb91| IN| null| |7IN00716079317820...| 716| 2| AB| INR| null| 0.00| 1.00| null|useridjsb91| null|useridjsb91| IN| null| |7IN00716079317820...| 716| 1| AB| INR| null| 0.00| 1.00| null|useridjsb91| null|useridjsb91| IN| null| |AU700716079381819...| 5700| 5| AB| INR| null| 0.00| 1.00| null|useridjsb91| null|useridjsb91| IN| null| |AU700716079381819...| 5700| 6| AB| INR| null| 4.00| 1.97| null|useridjsb91| null|useridjsb91| IN| null| |AU700716079381819...| 5700| 4| AB| INR| null| 0.00| 1.00| null|useridjsb91| null|useridjsb91| AU| null| |AU700716079381819...| 5700| 3| AB| INR| null| 0.00| 1.00| null|useridjsb91| null|useridjsb91| AU| null| |AU700716079381819...| 5700| 1| AB| INR| null| 0.00| 1.00| null|useridjsb91| null|useridjsb91| AU| null| +--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+ val transformedDF = formattedDF.withColumn("partitionpath", concat(lit("cntry_cd="), col("cntry_cd"), lit("/bus_dt="), col("bus_dt"))) val targetPath = "gs://XXXXXXXXXXX7bb3d68/test_table_tgt" transformedDF.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col_9"). option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "col_2,col_1,col_3"). option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator"). option("hoodie.upsert.shuffle.parallelism","2"). option("hoodie.insert.shuffle.parallelism","2"). option(HoodieWriteConfig.TABLE_NAME, "targetTableHudi"). mode(SaveMode.Append). save(targetPath) java.lang.NoSuchMethodError: org.apache.spark.sql.types.Decimal$.minBytesForPrecision()[I at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:156) at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:176) at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:174) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99) at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:174) at org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:52) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:139) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) ... 59 elided ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org