sydneyhoran opened a new issue, #8519:
URL: https://github.com/apache/hudi/issues/8519

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
dev-subscr...@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   We are using Deltastreamer with a PostgresDebeziumSource and consuming data 
from confluent Kafka, using Confluent Schema Registry as the Schema Provider. 
The job runs fine for some time and then all of a sudden fails with 
NullPointerException.
   
   We believe this to be caused by Kafka messages with empty/null values such 
as with Debezium tombstone records. We do not have the ability to modify the 
Debezium connectors to turn off tombstone records at this time.
   
   Looking for a solution to have Deltastreamer ignore/skip over tombstone 
Kafka messages that contain a null value.
   
   Thanks for any input!
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Use Debezium Kafka Connector to publish data from Postgres server to 
Kafka.
   2. Use PostgresDebeziumSource and Confluent Schema Registry to consume data.
   3. It runs fine for some records and stores the data into files in storage.
   4. Run a delete record operation on the Postgres DB to emit a tombstone 
record.
   5. After some time it fails with Null Pointer Exception without much 
description.
   
   **Expected behavior**
   
   The job should run without errors on empty Kafka message values.
   
   **Environment Description**
   
   * Hudi version : 0.13.0
   
   * Spark version : 3.1
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : both
   
   
   **Additional context**
   
   **hoodie configs:**
   ```
   --target-base-path s3a://{{ bucket }}/{{ table_path }}
   --target-table {{ table_name }}
   --continuous
   --props gs://path/to/tablename.properties
   --min-sync-interval-seconds 15
   --source-ordering-field updated_at
   --source-limit 5000
   --table-type COPY_ON_WRITE
   --op UPSERT
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
   ```
   
   **tablename.properties**
   ```
   hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url 
}}.confluent.cloud/subjects/{{ topic }}-value/versions/latest
   hoodie.deltastreamer.source.kafka.topic=some.topic
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=inserted_at
   hoodie.datasource.write.precombine.field=updated_at
   schema.registry.url={{ schema_url }}
   schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }}
   sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username='{{ kafka_user }}' password='{{ kafka_key }}';
   bootstrap.servers={{ bootstrap_server }}
   hoodie.embed.timeline.server=false
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
   hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
   hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
   ```
   
   **Stacktrace**
   
   
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 5.0 (TID 14) (10.253.229.42 executor 1): java.lang.NullPointerException
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
        at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
        at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at 
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:177)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:54)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:36)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
        ... 24 more
   Caused by: java.lang.NullPointerException
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to