[ 
https://issues.apache.org/jira/browse/HUDI-2058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375486#comment-17375486
 ] 

ASF GitHub Bot commented on HUDI-2058:
--------------------------------------

xiarixiaoyao commented on a change in pull request #3139:
URL: https://github.com/apache/hudi/pull/3139#discussion_r663752808



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -96,14 +100,31 @@ class IncrementalRelation(val sqlContext: SQLContext,
     val regularFileIdToFullPath = mutable.HashMap[String, String]()
     var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
 
+    // create Replaced file group
+    val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
+    val replacedFile = 
replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap 
{ instant =>
+      val replaceMetadata = HoodieReplaceCommitMetadata.
+        fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, 
classOf[HoodieReplaceCommitMetadata])
+      replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry 
=>
+        entry.getValue.map { e =>
+          val fullPath = FSUtils.getPartitionPath(basePath, 
entry.getKey).toString
+          (e, fullPath)
+        }
+      }
+    }.toMap
+
     for (commit <- commitsToReturn) {
       val metadata: HoodieCommitMetadata = 
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
         .get, classOf[HoodieCommitMetadata])
 
       if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) 
{
-        metaBootstrapFileIdToFullPath ++= 
metadata.getFileIdAndFullPaths(basePath).toMap
+        metaBootstrapFileIdToFullPath ++= 
metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>

Review comment:
       areadly added!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> support incremental query for insert_overwrite_table/insert_overwrite 
> operation on cow table
> --------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2058
>                 URL: https://issues.apache.org/jira/browse/HUDI-2058
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Incremental Pull
>    Affects Versions: 0.8.0
>         Environment: hadoop 3.1.1
> spark3.1.1
> hive 3.1.1
>            Reporter: tao meng
>            Assignee: tao meng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
>  when  incremental query contains multiple commit before and after 
> replacecommit, and the query result contains the data of the old file. 
> Notice: mor table is ok, only cow table has this problem.
>  
> when query incr_view for cow table, replacecommit is ignored which lead the 
> wrong result. 
>  
>  
> test step:
> step1:  create dataFrame
> val df = spark.range(0, 10).toDF("keyid")
>  .withColumn("col3", expr("keyid"))
>  .withColumn("age", lit(1))
>  .withColumn("p", lit(2))
>  
> step2:  insert df to a empty hoodie table
> df.write.format("hudi").
>  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
>  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
>  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
>  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
>  option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
> "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
>  option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
>  option("hoodie.insert.shuffle.parallelism", "4").
>  option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
>  .mode(SaveMode.Overwrite).save(basePath)
>  
> step3: do insert_overwrite
> df.write.format("hudi").
>  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
>  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
>  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
>  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
>  option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
> "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
>  option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert_overwrite_table").
>  option("hoodie.insert.shuffle.parallelism", "4").
>  option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
>  .mode(SaveMode.Append).save(basePath)
>  
> step4: query incrematal table 
> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
>  .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "0000")
>  .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, currentCommits(0))
>  .load(basePath).select("keyid").orderBy("keyid").show(100, false)
>  
> result:   the result contains old data
> +-----+
> |keyid|
> +-----+
> |0 |
> |0 |
> |1 |
> |1 |
> |2 |
> |2 |
> |3 |
> |3 |
> |4 |
> |4 |
> |5 |
> |5 |
> |6 |
> |6 |
> |7 |
> |7 |
> |8 |
> |8 |
> |9 |
> |9 |
> +-----+
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to