[ 
https://issues.apache.org/jira/browse/HUDI-2487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang closed HUDI-2487.
--------------------------
    Fix Version/s:     (was: 0.9.0)
                   0.10.0
       Resolution: Implemented

9067657a5ff313990c819065ad12d71fa8bb0f06

> An empty message in Kafka causes a task exception
> -------------------------------------------------
>
>                 Key: HUDI-2487
>                 URL: https://issues.apache.org/jira/browse/HUDI-2487
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: DeltaStreamer
>            Reporter: qianchutao
>            Assignee: qianchutao
>            Priority: Major
>              Labels: easyfix, newbie, pull-request-available
>             Fix For: 0.10.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> h1. Question:
>       When I use deltaStreamer to update hive tables in upsert mode from json 
> data in Kafka to HUDi, if the value of the message body in Kafka is null, the 
> task throws an exception.
> h2. Exception description:
> Lost task 0.1 in stage 2.0 (TID 24, 
> node-group-1UtpO.1f562475-6982-4b16-a50d-d19b0ebff950.com, executor 6): 
> org.apache.hudi.exception.HoodieException: The value of tmSmp can not be null
>  at 
> org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:463)
>  at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:389)
>  at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>  at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:196)
>  at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>  at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:58)
>  at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>  at org.apache.spark.scheduler.Task.run(Task.scala:123)
>  at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:413)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1551)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> h1. The task Settings:
>  
> {code:java}
> hoodie.datasource.write.precombine.field=tmSmp
> hoodie.datasource.write.recordkey.field=subOrderId,activityId,ticketId
> hoodie.datasource.hive_sync.partition_fields=db,dt
> hoodie.datasource.write.partitionpath.field=db:SIMPLE,dt:SIMPLE
> hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
> hoodie.datasource.hive_sync.enable=true
> hoodie.datasource.meta.sync.enable=true
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
> hoodie.datasource.hive_sync.support_timestamp=true
> hoodie.datasource.hive_sync.auto_create_database=true
> hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool
> hoodie.datasource.hive_sync.base_file_format=PARQUET
> {code}
>  
>  
> h1. Spark-submit Script parameter Settings:
>  
> {code:java}
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
> --source-ordering-field tmSmp \
> --table-type MERGE_ON_READ  \
> --target-table ${TABLE_NAME} \
> --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
> --schemaprovider-class 
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
> --enable-sync \
> --op UPSERT \
> --continuous \
> {code}
>  
>  
>        So I think some optimizations can be made to prevent task throwing, 
> such as filtering messages with a null value in Kafka.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to