ShrutiBansal309 opened a new issue, #10823:
URL: https://github.com/apache/hudi/issues/10823

   **Issue**
   I am using Hudi 0.14.0 and Spark 3.4.0 on EMR cluster 6.15.0.
   I have a service that writes a Dataset<Row> to a table in Hudi located on 
S3. I am facing issues when trying to delete data from this table (reference 
from https://hudi.apache.org/docs/0.13.1/quick-start-guide#hard-deletes) with 
below error:
   ```
   java.lang.ClassCastException: java.lang.Integer cannot be cast to 
org.apache.spark.unsafe.types.UTF8String
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
 ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
 ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
 ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:76)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:294)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:307)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:330)
 ~[hudi-utilities-bundle_2.12-0.13.1-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:348)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:412)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:239)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:731)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
 Source) ~[?:?]
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) ~[?:?]
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown
 Source) ~[?:?]
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at scala.collection.Iterator.isEmpty(Iterator.scala:387) 
~[scala-library-2.12.15.jar:?]
        at scala.collection.Iterator.isEmpty$(Iterator.scala:387) 
~[scala-library-2.12.15.jar:?]
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:957)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
 ~[hudi-utilities-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:58)
 ~[hudi-utilities-bundle_2.12-0.13.1-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.scheduler.Task.run(Task.scala:141) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
 ~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) 
~[spark-core_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_402]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_402]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
   ```
   
   According to debug logs, the above error is coming from partitioning key 
which is of DateType but Hudi is reading it as StringType. This is what I see 
in debug logs :
   ```
   24/02/13 01:44:49 DEBUG Spark32PlusHoodieParquetFileFormat: Appending 
StructType(StructField(yyyymmdate_prt,StringType,true)) [19358]
   ```
   
   **To Reproduce**
   Use below code to reproduce:
   ```
   sudo spark-shell --jars 
s3://dmoncloud-voltst-emr-data/testing/patchedbuild/hudi-utilities-bundle_2.12-0.13.1-amzn-0.jar
 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
   
   //------
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions._
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   import org.apache.hudi.hive.HiveSyncConfig
   import org.apache.hudi.sync.common.HoodieSyncConfig
   import org.apache.spark.sql.types._
   
   // -------
   
   val s3RootLocation = "s3://dmoncloud-voltst-emr-data/sparkhudi"
   val tableName = "testhudi11"
   val tablePath = s3RootLocation + "/" + tableName
   
   val hudiOptions = Map[String,String](
     "hoodie.table.name" -> tableName,
     "hoodie.datasource.write.table.name" -> tableName,
     "hoodie.insert.shuffle.parallelism" -> "2",
     "hoodie.upsert.shuffle.parallelism" -> "2",
     "hoodie.bulkinsert.shuffle.parallelism" -> "2",
     "hoodie.datasource.write.partitionpath.field" -> "creation_date",
     "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
     "hoodie.datasource.write.recordkey.field" -> "id",
     "hoodie.datasource.write.precombine.field" -> "id",
     
"hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled" -> 
"true",
     "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.TimestampBasedKeyGenerator",
     "hoodie.deltastreamer.keygen.timebased.timestamp.type" -> "SCALAR",
     "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit" -> 
"DAYS",
     "hoodie.deltastreamer.keygen.timebased.input.dateformat" -> "yyyy-MM-dd",
     "hoodie.deltastreamer.keygen.timebased.output.dateformat" -> "yyyy-MM-dd",
     "hoodie.deltastreamer.keygen.timebased.timezone" -> "GMT+8:00",
     "hoodie.datasource.hive_sync.enable" -> "true",
     "hoodie.datasource.write.hive_style_partitioning" -> "true",
     "hoodie.datasource.hive_sync.partition_fields" -> "creation_date",
     "hoodie.datasource.hive_sync.database" -> "default",
     "hoodie.datasource.hive_sync.table" -> tableName,
     "hoodie.datasource.hive_sync.support_timestamp" -> "true",
     "hoodie.datasource.hive_sync.mode" -> "hms",
     "hoodie.metadata.enable" -> "false"
   )
   
   val customSchema = StructType(Array(
     StructField("id", IntegerType, true),
     StructField("creation_date", DateType, true)
   )
   
   //-------
   
   val 
df=spark.read.format("csv").option("header","true").schema(customSchema).load(s3RootLocation
 + "/input")
   
   (df.write
       .format("hudi")
       .options(hudiOptions)
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"upsert")
       .mode(SaveMode.Append)
       .save(tablePath))
   
   //-------DELETE
   
   val deleteDF=spark.sql("select * from " + tableName)
   
   (deleteDF.write
       .format("hudi")
       .options(hudiOptions)
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"delete")
       .mode(SaveMode.Append)
       .save(tablePath))
   ```
   
   **To Resolve**
   Tried https://github.com/apache/hudi/pull/9273 but it did not work.
   
   CC: @ad1happy2go 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to