sydneyhoran opened a new issue, #8519: URL: https://github.com/apache/hudi/issues/8519
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at dev-subscr...@hudi.apache.org. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** We are using Deltastreamer with a PostgresDebeziumSource and consuming data from confluent Kafka, using Confluent Schema Registry as the Schema Provider. The job runs fine for some time and then all of a sudden fails with NullPointerException. We believe this to be caused by Kafka messages with empty/null values such as with Debezium tombstone records. We do not have the ability to modify the Debezium connectors to turn off tombstone records at this time. Looking for a solution to have Deltastreamer ignore/skip over tombstone Kafka messages that contain a null value. Thanks for any input! **To Reproduce** Steps to reproduce the behavior: 1. Use Debezium Kafka Connector to publish data from Postgres server to Kafka. 2. Use PostgresDebeziumSource and Confluent Schema Registry to consume data. 3. It runs fine for some records and stores the data into files in storage. 4. Run a delete record operation on the Postgres DB to emit a tombstone record. 5. After some time it fails with Null Pointer Exception without much description. **Expected behavior** The job should run without errors on empty Kafka message values. **Environment Description** * Hudi version : 0.13.0 * Spark version : 3.1 * Hive version : N/A * Hadoop version : N/A * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : both **Additional context** **hoodie configs:** ``` --target-base-path s3a://{{ bucket }}/{{ table_path }} --target-table {{ table_name }} --continuous --props gs://path/to/tablename.properties --min-sync-interval-seconds 15 --source-ordering-field updated_at --source-limit 5000 --table-type COPY_ON_WRITE --op UPSERT --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload ``` **tablename.properties** ``` hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url }}.confluent.cloud/subjects/{{ topic }}-value/versions/latest hoodie.deltastreamer.source.kafka.topic=some.topic hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=inserted_at hoodie.datasource.write.precombine.field=updated_at schema.registry.url={{ schema_url }} schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }} sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ kafka_user }}' password='{{ kafka_key }}'; bootstrap.servers={{ bootstrap_server }} hoodie.embed.timeline.server=false hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd ``` **Stacktrace** ``` Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 14) (10.253.229.42 executor 1): java.lang.NullPointerException Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362) at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:177) at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142) at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113) at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91) at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:54) at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:36) at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64) ... 24 more Caused by: java.lang.NullPointerException ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org