singhamarjit1995-sys opened a new issue, #18381:
URL: https://github.com/apache/hudi/issues/18381

   ### Bug Description
   
   
   ## **Environment**
   
   *   **Hudi version:** `0.14.0`
   *   **Spark version:** Spark 3.4.x
   
     
   *   **Storage:** Parquet
   *   **Table type:** COW
   *   **Read type:** Incremental query
   
   ***
   
   ## **Problem Description**
   
   I am attempting to run delete operations on Hudi tables using Spark by 
reading data incrementally, filtering rows based on a condition, and writing 
them back using `hoodie.datasource.write.operation=delete`.
   
   This logic works correctly for:
   
   *   ✅ Non‑partitioned Hudi tables
   *   ✅ Tested across multiple ETLs
   
   However, the same logic consistently **fails for partitioned Hudi tables**.
   
   ***
   
   ## **Tables Tested**
   
   ### ✅ Works
   
   *   Non‑partitioned tables
   
   ### ❌ Fails
   
   
   All failing tables are **partitioned** .
   
   ***
   
   ## **Reproducible Code Snippet**
   
   ```java
   HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
           .setConf(new org.apache.hadoop.conf.Configuration())
           .setBasePath(tablePath)
           .build();
   
   HoodieTableConfig config = metaClient.getTableConfig();
   
   String recordKey = config.getRecordKeyFieldProp();
   String precombineField = config.getPreCombineField();
   Option<String[]> partitionFields = config.getPartitionFields();
   String partitionPathField = String.join(",", partitionFields.orElse(new 
String[0]));
   
   // Incremental read
   Dataset<Row> sourceDF = spark.read()
           .format("org.apache.hudi")
           .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),
                   DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
           .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), "0")
           .load(tablePath);
   
   sourceDF.createOrReplaceTempView("incremental_view");
   
   // Select rows to delete
   Dataset<Row> deleteDF = spark.sql(
           "SELECT * FROM incremental_view WHERE <delete_condition>")
           .withColumn("_hoodie_is_deleted", functions.lit(true));
   
   // Write delete operation
   DataFrameWriter<Row> writer = deleteDF.write()
           .format("hudi")
           .option("hoodie.table.name", config.getTableName())
           .option("hoodie.datasource.write.recordkey.field", recordKey)
           .option("hoodie.datasource.write.precombine.field", precombineField)
           .option("hoodie.datasource.write.operation", "delete")
           .mode(SaveMode.Append);
   
   // Partition handling
   if (!partitionPathField.isEmpty()) {
       writer = writer
           .option("hoodie.datasource.write.partitionpath.field", 
partitionPathField)
           .option("hoodie.datasource.write.keygenerator.class",
                   "org.apache.hudi.keygen.TimestampBasedKeyGenerator")
           .option("hoodie.keygen.timebased.timestamp.type", "SCALAR")
           .option("hoodie.keygen.timebased.input.dateformat", "yyyy-MM-dd")
           .option("hoodie.keygen.timebased.output.dateformat", "yyyy-MM-dd");
   }
   
   writer.save(tablePath);
   ```
   
   ***
   
   ## **Suspected Root Cause**
   
   *   Failure only occurs for **partitioned tables**
   *   All affected tables have **INTEGER partition columns**
   
   ***
   
   
   
   
   ### Environment
   
   **Hudi version:**  `0.14.0`
   **Query engine:** (Spark)
   
   
   ### Logs and Stack Trace
   
   ## **Error Observed**
   
   ```text
   Lost task 2.3 in stage 4.0 (TID 18):
   java.lang.ClassCastException: java.lang.Integer cannot be cast to
   org.apache.spark.unsafe.types.UTF8String
       at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
       at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
       at 
org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:76)
       at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:294)
       at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:307)
       at org.apache.spark.sql.execution.datasources.parquet
          .Spark34LegacyHoodieParquetFileFormat
          
.$anonfun$buildReaderWithPartitionValues$2(Spark34LegacyHoodieParquetFileFormat.scala:332)
       at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.readCurrentFile(FileScanRDD.scala:357)
       at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:245)
       at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to