tao meng created HUDI-1719:
------------------------------

             Summary: hive on spark/mr,Incremental query of the mor table, the 
partition field is incorrect
                 Key: HUDI-1719
                 URL: https://issues.apache.org/jira/browse/HUDI-1719
             Project: Apache Hudi
          Issue Type: Bug
          Components: Hive Integration
    Affects Versions: 0.7.0, 0.8.0
         Environment: spark2.4.5, hadoop 3.1.1, hive 3.1.1
            Reporter: tao meng
             Fix For: 0.9.0


now hudi use HoodieCombineHiveInputFormat to achieve Incremental query of the 
mor table.

when we have some small files in different partitions, 
HoodieCombineHiveInputFormat  will combine those small file readers.   
HoodieCombineHiveInputFormat  build partition field base on  the first file 
reader in it, however now HoodieCombineHiveInputFormat  holds other file 
readers which come from different partitions.

When switching readers, we should  update ioctx

test env:

spark2.4.5, hadoop 3.1.1, hive 3.1.1

test step:

step1:

val df = spark.range(0, 10000).toDF("keyid")
 .withColumn("col3", expr("keyid + 10000000"))
 .withColumn("p", lit(0))
 .withColumn("p1", lit(0))
 .withColumn("p2", lit(6))
 .withColumn("a1", lit(Array[String]("sb1", "rz")))
 .withColumn("a2", lit(Array[String]("sb1", "rz")))

// create hudi table which has three  level partitions p,p1,p2

merge(df, 4, "default", "hive_8b", 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")

 

step2:

val df = spark.range(0, 10000).toDF("keyid")
 .withColumn("col3", expr("keyid + 10000000"))
 .withColumn("p", lit(0))
 .withColumn("p1", lit(0))
 .withColumn("p2", lit(7))
 .withColumn("a1", lit(Array[String]("sb1", "rz")))
 .withColumn("a2", lit(Array[String]("sb1", "rz")))

// upsert current table

merge(df, 4, "default", "hive_8b", 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")

hive beeline:

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

set hoodie.hive_8b.consume.mode=INCREMENTAL;

set hoodie.hive_8b.consume.max.commits=3;

set hoodie.hive_8b.consume.start.timestamp=20210325141300;

select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where 
`_hoodie_commit_time`>'20210325141300' and `keyid` < 5;

query result:

+----+-----+-----+--------+ 
| p | p1 | p2 | keyid | 
+----+-----+-----+--------+ 
| 0 | 0 | 6 | 0 | 
| 0 | 0 | 6 | 1 | 
| 0 | 0 | 6 | 2 | 
| 0 | 0 | 6 | 3 | 
| 0 | 0 | 6 | 4 | 
| 0 | 0 | 6 | 4 | 
| 0 | 0 | 6 | 0 | 
| 0 | 0 | 6 | 3 | 
| 0 | 0 | 6 | 2 | 
| 0 | 0 | 6 | 1 | 
+----+-----+-----+--------+

this result is wrong, since the second step we insert new data in table which 
p2=7, however in the query result we cannot find p2=7, all p2= 6

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to