[ https://issues.apache.org/jira/browse/HUDI-2487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
vinoyang closed HUDI-2487. -------------------------- Fix Version/s: (was: 0.9.0) 0.10.0 Resolution: Implemented 9067657a5ff313990c819065ad12d71fa8bb0f06 > An empty message in Kafka causes a task exception > ------------------------------------------------- > > Key: HUDI-2487 > URL: https://issues.apache.org/jira/browse/HUDI-2487 > Project: Apache Hudi > Issue Type: Improvement > Components: DeltaStreamer > Reporter: qianchutao > Assignee: qianchutao > Priority: Major > Labels: easyfix, newbie, pull-request-available > Fix For: 0.10.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > h1. Question: > When I use deltaStreamer to update hive tables in upsert mode from json > data in Kafka to HUDi, if the value of the message body in Kafka is null, the > task throws an exception. > h2. Exception description: > Lost task 0.1 in stage 2.0 (TID 24, > node-group-1UtpO.1f562475-6982-4b16-a50d-d19b0ebff950.com, executor 6): > org.apache.hudi.exception.HoodieException: The value of tmSmp can not be null > at > org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:463) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:389) > at > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:196) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) > at > org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:58) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:413) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1551) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > h1. The task Settings: > > {code:java} > hoodie.datasource.write.precombine.field=tmSmp > hoodie.datasource.write.recordkey.field=subOrderId,activityId,ticketId > hoodie.datasource.hive_sync.partition_fields=db,dt > hoodie.datasource.write.partitionpath.field=db:SIMPLE,dt:SIMPLE > hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator > hoodie.datasource.hive_sync.enable=true > hoodie.datasource.meta.sync.enable=true > hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor > hoodie.datasource.hive_sync.support_timestamp=true > hoodie.datasource.hive_sync.auto_create_database=true > hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool > hoodie.datasource.hive_sync.base_file_format=PARQUET > {code} > > > h1. Spark-submit Script parameter Settings: > > {code:java} > --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer > --source-ordering-field tmSmp \ > --table-type MERGE_ON_READ \ > --target-table ${TABLE_NAME} \ > --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ > --schemaprovider-class > org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ > --enable-sync \ > --op UPSERT \ > --continuous \ > {code} > > > So I think some optimizations can be made to prevent task throwing, > such as filtering messages with a null value in Kafka. > -- This message was sent by Atlassian Jira (v8.3.4#803005)