codejoyan edited a comment on issue #2592:
URL: https://github.com/apache/hudi/issues/2592#issuecomment-785036461


   Sure @bvaradar . Below steps will help to reproduce. 
   **Observation:** This is failing when the dataframe has specific datatypes 
decimal with precision. If it is other datatypes it succeeds
   
   **Environment Details:**
   Spark version in HDP - 2.3.0.2.6.5.279-2
   Hudi: hudi-spark-bundle_2.11:0.7.0
   
   **Step-1:** Create dummy data
   
   ```
   $ hdfs dfs -cat hdfs://XXXXXw00ha/user/joyan/test_data.csv
   
col_1,col_2,col_3,col_4,col_5,col_6,col_7,col_8,col_9,col_10,col_11,col_12,cntry_cd,bus_dt
   
7IN00716079317820210109153408,716,3,AB,INR,107667253,0,1,2021-02-1420:23:54.753,useridjsb91,2021-02-1420:23:54.753,useridjsb91,IN,'2021-02-01'
   
7IN00716079317820210109153408,716,2,AB,INR,212733302,0,1,2021-02-1420:23:54.753,useridjsb91,2021-02-1420:23:54.753,useridjsb91,IN,'2021-02-01'
   
7IN00716079317820210109153408,716,1,AB,INR,139224013,0,1,2021-02-1420:23:54.753,useridjsb91,2021-02-1420:23:54.753,useridjsb91,IN,'2021-02-01'
   
AU700716079381819643325112243,5700,5,AB,INR,136158881,0,1,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,IN,'2021-02-02'
   
AU700716079381819643325112243,5700,6,AB,INR,585710940,4,1.97,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,IN,'2021-02-02'
   
AU700716079381819643325112243,5700,4,AB,INR,136158881,0,1,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,AU,'2021-02-01'
   
AU700716079381819643325112243,5700,3,AB,INR,136158881,0,1,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,AU,'2021-02-01'
   
AU700716079381819643325112243,5700,1,AB,INR,136158881,0,1,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,AU,'2021-02-01'
   ```
   
   **Step-2:** Try to read the data from file, cast to appropriate datatypes 
and save as Hudi table
   ```
   $ spark-shell \
   --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.7.0,org.apache.spark:spark-avro_2.11:2.4.4,org.apache.avro:avro:1.8.2
 \
   --conf spark.driver.extraClassPath=/path/org.apache.avro_avro-1.8.2.jar \
   --conf spark.executor.extraClassPath=/path/org.apache.avro_avro-1.8.2.jar \
   --conf "spark.sql.hive.convertMetastoreParquet=false" \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
   
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
   import org.apache.hudi.config.{HoodieCompactionConfig, HoodieStorageConfig, 
HoodieWriteConfig}
   import org.apache.spark.sql.{SaveMode, SparkSession}
   import org.apache.spark.sql.functions.{col, concat, lit}
   
   val inputDF = spark.read.format("csv").option("header", 
"true").load("hdfs://XXXXXw00ha/user/joyan/test_data.csv")
   
   val formattedDF = inputDF.selectExpr("col_1", "cast(col_2 as integer) col_2",
        "cast(col_3 as short) col_3", "col_4", "col_5", "cast(col_6 as byte) 
col_6", "cast(col_7 as decimal(9,2)) col_7",
        "cast(col_8 as decimal(9,2)) col_8", "cast(col_9 as timestamp) col_9", 
"col_10", "cast(col_11 as timestamp) col_11",
        "col_12", "cntry_cd", "cast(bus_dt as timestamp) bus_dt")
   
   formattedDF.printSchema
   root
    |-- col_1: string (nullable = true)
    |-- col_2: integer (nullable = true)
    |-- col_3: short (nullable = true)
    |-- col_4: string (nullable = true)
    |-- col_5: string (nullable = true)
    |-- col_6: byte (nullable = true)
    |-- col_7: decimal(9,2) (nullable = true)
    |-- col_8: decimal(9,2) (nullable = true)
    |-- col_9: timestamp (nullable = true)
    |-- col_10: string (nullable = true)
    |-- col_11: timestamp (nullable = true)
    |-- col_12: string (nullable = true)
    |-- cntry_cd: string (nullable = true)
    |-- bus_dt: timestamp (nullable = true)
   
   
   formattedDF.show
   
+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+
   |               col_1|col_2|col_3|col_4|col_5|col_6|col_7|col_8|col_9|     
col_10|col_11|     col_12|cntry_cd|bus_dt|
   
+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+
   |7IN00716079317820...|  716|    3|   AB|  INR| null| 0.00| 1.00| 
null|useridjsb91|  null|useridjsb91|      IN|  null|
   |7IN00716079317820...|  716|    2|   AB|  INR| null| 0.00| 1.00| 
null|useridjsb91|  null|useridjsb91|      IN|  null|
   |7IN00716079317820...|  716|    1|   AB|  INR| null| 0.00| 1.00| 
null|useridjsb91|  null|useridjsb91|      IN|  null|
   |AU700716079381819...| 5700|    5|   AB|  INR| null| 0.00| 1.00| 
null|useridjsb91|  null|useridjsb91|      IN|  null|
   |AU700716079381819...| 5700|    6|   AB|  INR| null| 4.00| 1.97| 
null|useridjsb91|  null|useridjsb91|      IN|  null|
   |AU700716079381819...| 5700|    4|   AB|  INR| null| 0.00| 1.00| 
null|useridjsb91|  null|useridjsb91|      AU|  null|
   |AU700716079381819...| 5700|    3|   AB|  INR| null| 0.00| 1.00| 
null|useridjsb91|  null|useridjsb91|      AU|  null|
   |AU700716079381819...| 5700|    1|   AB|  INR| null| 0.00| 1.00| 
null|useridjsb91|  null|useridjsb91|      AU|  null|
   
+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+
   
   val transformedDF = formattedDF.withColumn("partitionpath", 
concat(lit("cntry_cd="), col("cntry_cd"), lit("/bus_dt="), col("bus_dt")))
   
   val targetPath = "gs://XXXXXXXXXXX7bb3d68/test_table_tgt"
   
   transformedDF.write.format("org.apache.hudi").
   options(getQuickstartWriteConfigs).
   option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col_9").
   option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "col_2,col_1,col_3").
   option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
   option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.ComplexKeyGenerator").
   option("hoodie.upsert.shuffle.parallelism","2").
   option("hoodie.insert.shuffle.parallelism","2").
   option(HoodieWriteConfig.TABLE_NAME, "targetTableHudi").
   mode(SaveMode.Append).
   save(targetPath)
   
   java.lang.NoSuchMethodError: 
org.apache.spark.sql.types.Decimal$.minBytesForPrecision()[I
     at 
org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:156)
     at 
org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:176)
     at 
org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:174)
     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
     at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
     at 
org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:174)
     at 
org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:52)
     at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:139)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134)
     at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
     at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
     at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
     ... 59 elided
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to