[ https://issues.apache.org/jira/browse/HUDI-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sivabalan narayanan resolved HUDI-1719. --------------------------------------- Resolution: Fixed > hive on spark/mr,Incremental query of the mor table, the partition field is > incorrect > ------------------------------------------------------------------------------------- > > Key: HUDI-1719 > URL: https://issues.apache.org/jira/browse/HUDI-1719 > Project: Apache Hudi > Issue Type: Bug > Components: Hive Integration > Affects Versions: 0.7.0, 0.8.0 > Environment: spark2.4.5, hadoop 3.1.1, hive 3.1.1 > Reporter: tao meng > Assignee: tao meng > Priority: Major > Labels: pull-request-available, sev:critical, user-support-issues > Fix For: 0.9.0 > > > now hudi use HoodieCombineHiveInputFormat to achieve Incremental query of the > mor table. > when we have some small files in different partitions, > HoodieCombineHiveInputFormat will combine those small file readers. > HoodieCombineHiveInputFormat build partition field base on the first file > reader in it, however now HoodieCombineHiveInputFormat holds other file > readers which come from different partitions. > When switching readers, we should update ioctx > test env: > spark2.4.5, hadoop 3.1.1, hive 3.1.1 > test step: > step1: > val df = spark.range(0, 10000).toDF("keyid") > .withColumn("col3", expr("keyid + 10000000")) > .withColumn("p", lit(0)) > .withColumn("p1", lit(0)) > .withColumn("p2", lit(6)) > .withColumn("a1", lit(Array[String]("sb1", "rz"))) > .withColumn("a2", lit(Array[String]("sb1", "rz"))) > // create hudi table which has three level partitions p,p1,p2 > merge(df, 4, "default", "hive_8b", > DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert") > > step2: > val df = spark.range(0, 10000).toDF("keyid") > .withColumn("col3", expr("keyid + 10000000")) > .withColumn("p", lit(0)) > .withColumn("p1", lit(0)) > .withColumn("p2", lit(7)) > .withColumn("a1", lit(Array[String]("sb1", "rz"))) > .withColumn("a2", lit(Array[String]("sb1", "rz"))) > // upsert current table > merge(df, 4, "default", "hive_8b", > DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert") > hive beeline: > set > hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; > set hoodie.hive_8b.consume.mode=INCREMENTAL; > set hoodie.hive_8b.consume.max.commits=3; > set hoodie.hive_8b.consume.start.timestamp=20210325141300; // this timestamp > is smaller the earlist commit, so we can query whole commits > select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where > `_hoodie_commit_time`>'20210325141300' and `keyid` < 5; > query result: > +-----+----++-------------+ > |p|p1|p2|keyid| > +-----+----++-------------+ > |0|0|6|0| > |0|0|6|1| > |0|0|6|2| > |0|0|6|3| > |0|0|6|4| > |0|0|6|4| > |0|0|6|0| > |0|0|6|3| > |0|0|6|2| > |0|0|6|1| > +-----+----++-------------+ > this result is wrong, since the second step we insert new data in table which > p2=7, however in the query result we cannot find p2=7, all p2= 6 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)