[ 
https://issues.apache.org/jira/browse/HUDI-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tao meng updated HUDI-1722:
---------------------------
    Description: 
HUDI-892 introduce this problem。
 this pr skip adding projection columns if there are no log files in the 
hoodieRealtimeSplit。 but this pr donnot consider that multiple getRecordReaders 
share same jobConf。
 Consider the following questions:
 we have four getRecordReaders: 
 reader1(its hoodieRealtimeSplit contains no log files)
 reader2 (its hoodieRealtimeSplit contains log files)
 reader3(its hoodieRealtimeSplit contains log files)
 reader4(its hoodieRealtimeSplit contains no log files)

now reader1 run first, HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP in 
jobConf will be set to be true, and no hoodie additional projection columns 
will be added to jobConf (see 
HoodieParquetRealtimeInputFormat.addProjectionToJobConf)

reader2 run later, since HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP in 
jobConf is set to be true, no hoodie additional projection columns will be 
added to jobConf. (see HoodieParquetRealtimeInputFormat.addProjectionToJobConf)
 which lead to the result that _hoodie_record_key would be missing and merge 
step would throw exceptions

2021-03-25 20:23:14,014 | INFO  | AsyncDispatcher event handler | Diagnostics 
report from attempt_1615883368881_0038_m_000000_0: Error: 
java.lang.NullPointerException2021-03-25 20:23:14,014 | INFO  | AsyncDispatcher 
event handler | Diagnostics report from attempt_1615883368881_0038_m_000000_0: 
Error: java.lang.NullPointerException at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:101)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43)
 at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79)
 at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:36)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:92)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43)
 at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79)
 at 
org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:68)
 at 
org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:77)
 at 
org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:42)
 at 
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:205)
 at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:191) 
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52) at 
org.apache.hadoop.hive.ql.exec.mr.ExecMapRunner.run(ExecMapRunner.java:37) at 
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465) at 
org.apache.hadoop.mapred.MapTask.run(MapTask.java:349) at 
org.apache.hadoop.mapred.YarnChild$1.run(YarnChild.java:183) at 
java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:422) at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761)
 at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:177)

 

Obviously, this is an occasional problem。 if reader2 run first, hoodie 
additional projection columns will be added to jobConf and in this case the 
query will be ok

sparksql can avoid this problem by set  spark.hadoop.cloneConf=true which is 
not recommended in spark, however hive has no way to avoid this problem。

 test step:

step1:

val df = spark.range(0, 100000).toDF("keyid")
 .withColumn("col3", expr("keyid"))
 .withColumn("p", lit(0))
 .withColumn("p1", lit(0))
 .withColumn("p2", lit(7))
 .withColumn("a1", lit(Array[String] ("sb1", "rz")))
 .withColumn("a2", lit(Array[String] ("sb1", "rz")))

// create hoodie table hive_14b

 merge(df, 4, "default", "hive_14b", 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")

notice:  bulk_insert will produce 4 files in hoodie table

step2:

 val df = spark.range(99999, 100002).toDF("keyid")
 .withColumn("col3", expr("keyid"))
 .withColumn("p", lit(0))
 .withColumn("p1", lit(0))
 .withColumn("p2", lit(7))
 .withColumn("a1", lit(Array[String] ("sb1", "rz")))
 .withColumn("a2", lit(Array[String] ("sb1", "rz")))

// upsert table 

merge(df, 4, "default", "hive_14b", 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")

 now :  we have four base files and one log file in hoodie table

step3: 

spark-sql/beeline: 

 select count(col3) from hive_14b_rt;

then the query failed.

  was:
HUDI-892 introduce this problem。
 this pr skip adding projection columns if there are no log files in the 
hoodieRealtimeSplit。 but this pr donnot consider that multiple getRecordReaders 
share same jobConf。
 Consider the following questions:
 we have four getRecordReaders: 
 reader1(its hoodieRealtimeSplit contains no log files)
 reader2 (its hoodieRealtimeSplit contains log files)
 reader3(its hoodieRealtimeSplit contains log files)
 reader4(its hoodieRealtimeSplit contains no log files)

now reader1 run first, HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP in 
jobConf will be set to be true, and no hoodie additional projection columns 
will be added to jobConf (see 
HoodieParquetRealtimeInputFormat.addProjectionToJobConf)

reader2 run later, since HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP in 
jobConf is set to be true, no hoodie additional projection columns will be 
added to jobConf. (see HoodieParquetRealtimeInputFormat.addProjectionToJobConf)
 which lead to the result that _hoodie_record_key would be missing and merge 
step would throw exceptions

Caused by: java.io.IOException: java.lang.NullPointerException
 at 
org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:611)
 at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:518)
 at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:150)
 at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:3296)
 at 
org.apache.hadoop.hive.ql.reexec.ReExecDriver.getResults(ReExecDriver.java:252)
 at 
org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:522)
 ... 24 more
 Caused by: java.lang.NullPointerException
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:93)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43)
 
 at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79)
 
 at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:36)
 
 at 
org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:578)
 at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:518)
 at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:150) 
 at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:3296) 
 at 
org.apache.hadoop.hive.ql.reexec.ReExecDriver.getResults(ReExecDriver.java:252) 
 at 
org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:522)

Obviously, this is an occasional problem。 if reader2 run first, hoodie 
additional projection columns will be added to jobConf and in this case the 
query will be ok

sparksql can avoid this problem by set  spark.hadoop.cloneConf=true which is 
not recommended in spark, however hive has no way to avoid this problem。

 

 

 


> hive beeline/spark-sql  query specified field on mor table occur NPE
> --------------------------------------------------------------------
>
>                 Key: HUDI-1722
>                 URL: https://issues.apache.org/jira/browse/HUDI-1722
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Hive Integration, Spark Integration
>    Affects Versions: 0.7.0
>         Environment: spark2.4.5, hadoop3.1.1, hive 3.1.1
>            Reporter: tao meng
>            Priority: Major
>             Fix For: 0.9.0
>
>
> HUDI-892 introduce this problem。
>  this pr skip adding projection columns if there are no log files in the 
> hoodieRealtimeSplit。 but this pr donnot consider that multiple 
> getRecordReaders share same jobConf。
>  Consider the following questions:
>  we have four getRecordReaders: 
>  reader1(its hoodieRealtimeSplit contains no log files)
>  reader2 (its hoodieRealtimeSplit contains log files)
>  reader3(its hoodieRealtimeSplit contains log files)
>  reader4(its hoodieRealtimeSplit contains no log files)
> now reader1 run first, HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP in 
> jobConf will be set to be true, and no hoodie additional projection columns 
> will be added to jobConf (see 
> HoodieParquetRealtimeInputFormat.addProjectionToJobConf)
> reader2 run later, since HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP in 
> jobConf is set to be true, no hoodie additional projection columns will be 
> added to jobConf. (see 
> HoodieParquetRealtimeInputFormat.addProjectionToJobConf)
>  which lead to the result that _hoodie_record_key would be missing and merge 
> step would throw exceptions
> 2021-03-25 20:23:14,014 | INFO  | AsyncDispatcher event handler | Diagnostics 
> report from attempt_1615883368881_0038_m_000000_0: Error: 
> java.lang.NullPointerException2021-03-25 20:23:14,014 | INFO  | 
> AsyncDispatcher event handler | Diagnostics report from 
> attempt_1615883368881_0038_m_000000_0: Error: java.lang.NullPointerException 
> at 
> org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:101)
>  at 
> org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43)
>  at 
> org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79)
>  at 
> org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:36)
>  at 
> org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:92)
>  at 
> org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43)
>  at 
> org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79)
>  at 
> org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:68)
>  at 
> org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:77)
>  at 
> org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:42)
>  at 
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:205)
>  at 
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:191) 
> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52) at 
> org.apache.hadoop.hive.ql.exec.mr.ExecMapRunner.run(ExecMapRunner.java:37) at 
> org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465) at 
> org.apache.hadoop.mapred.MapTask.run(MapTask.java:349) at 
> org.apache.hadoop.mapred.YarnChild$1.run(YarnChild.java:183) at 
> java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761)
>  at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:177)
>  
> Obviously, this is an occasional problem。 if reader2 run first, hoodie 
> additional projection columns will be added to jobConf and in this case the 
> query will be ok
> sparksql can avoid this problem by set  spark.hadoop.cloneConf=true which is 
> not recommended in spark, however hive has no way to avoid this problem。
>  test step:
> step1:
> val df = spark.range(0, 100000).toDF("keyid")
>  .withColumn("col3", expr("keyid"))
>  .withColumn("p", lit(0))
>  .withColumn("p1", lit(0))
>  .withColumn("p2", lit(7))
>  .withColumn("a1", lit(Array[String] ("sb1", "rz")))
>  .withColumn("a2", lit(Array[String] ("sb1", "rz")))
> // create hoodie table hive_14b
>  merge(df, 4, "default", "hive_14b", 
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
> notice:  bulk_insert will produce 4 files in hoodie table
> step2:
>  val df = spark.range(99999, 100002).toDF("keyid")
>  .withColumn("col3", expr("keyid"))
>  .withColumn("p", lit(0))
>  .withColumn("p1", lit(0))
>  .withColumn("p2", lit(7))
>  .withColumn("a1", lit(Array[String] ("sb1", "rz")))
>  .withColumn("a2", lit(Array[String] ("sb1", "rz")))
> // upsert table 
> merge(df, 4, "default", "hive_14b", 
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")
>  now :  we have four base files and one log file in hoodie table
> step3: 
> spark-sql/beeline: 
>  select count(col3) from hive_14b_rt;
> then the query failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to