Hi,

We have a use case to perform soft delete over some record keys where we
nullify non-key fields and ignore any update for this record later on.  We
thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi hard
delete (_hoodie_is_deleted) does to make it simple to identify if the
platform perform any soft delete but I am getting avro field not found
exception when we perform another soft delete on the same index, please let
me know if you have any advise how to fix it or if this is a wrong
approach, we wanted to avoid adding any extra field in the customer schema
and behind the scene filter the soft delete record as done for hard delete
but still keep the record in the system.


Hudi : 0.8.0
Exception stacktrace:

2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID
33283, 172.25.31.77, executor 3):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :5
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
  at
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
  at
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
  at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
  at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
  at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
  at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
  at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
  at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
  at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
  at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)
  at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
  at
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
  ... 30 more
Caused by: org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
  at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
  at
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
  ... 33 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
  ... 34 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  ... 3 more
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro
schema mismatch: Avro field '_hoodie_is_soft_deleted' not found
  at
org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
  at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
  at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
  at
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
  at
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
  at
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
  at
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
  at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
  at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
  at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  ... 4 more







How we add this column to the Spark dataframe :

object SoftDeleteColInfo {
  val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
  val softDeleteStrVal = "true"

  val softDeletedUDF = udf(softDeleted)

  def softDeleted() = (arg: String) => arg
}

sparkSession.udf.register("softDeletedUDF", SoftDeleteColInfo.softDeletedUDF)
df.withColumn(softDeleteHudiMetaCol,
functions.callUDF("softDeletedUDF", lit("true")))

Reply via email to