wombatu-kun commented on code in PR #19005:
URL: https://github.com/apache/hudi/pull/19005#discussion_r3434866192


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
 
   override def shouldIncludeLogFiles(): Boolean = fullTableScan
 
+  /**
+   * Collects the incremental file slices for the given modified partitions.
+   *
+   * Each returned file slice carries its base file plus all of its log files 
so the
+   * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a 
correct runtime merge.
+   * Resolving the current value of a record changed in the incremental window 
requires the full file
+   * slice, not just the files written within the window:
+   *  - EVENT_TIME_ORDERING: a record written in the window may carry a lower 
ordering value than the
+   *    version already in the base/earlier-log, so the existing version wins; 
a window-only view
+   *    cannot determine the winner.
+   *  - Partial updates: a window log block holds only the changed columns, so 
the unchanged columns
+   *    must be filled in from the base file.
+   * (HUDI #18943.)
+   *
+   * The view is built from the (modified) partition listing (which includes 
the latest base file of
+   * each file group, honoring the metadata table when enabled) and scoped 
back to the file groups
+   * actually touched in the window (see [[affectedFileGroupIds]]) so 
untouched file groups are not
+   * read. The commit-time record filter applied during the scan still 
restricts the returned records
+   * to the incremental window.
+   *
+   * The view timeline is bounded to instants at or before `latestCommit` (the 
window's last commit)
+   * so that base/log files written by later commits are not visible. Without 
this bound a record
+   * updated again after the window would be merged with those later log files 
and its merged commit
+   * time would fall outside the window, dropping the in-window change from 
the result.
+   */
+  private def collectIncrementalFileSlices(partitionPaths: Seq[String], 
latestCommit: String): Seq[FileSlice] = {
+    val engineContext = new HoodieSparkEngineContext(new 
JavaSparkContext(sqlContext.sparkContext))
+    val fsView = 
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(
+      engineContext, metaClient, fileIndex.metadataConfig, 
timeline.findInstantsBeforeOrEquals(latestCommit))
+    try {
+      partitionPaths.flatMap { relativePartitionPath =>
+        fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit).iterator().asScala
+          .filter(fs => affectedFileGroupIds.contains(fs.getFileId))
+      }
+    } finally {
+      fsView.close()
+    }
+  }
+
+  /**
+   * Builds the incremental file slices directly from the files recorded in 
the window's commits
+   * (`affectedFilesInCommits`), as the read path did before HUDI #18943.
+   *
+   * This is used only when [[hasMissingAffectedFiles]] is true and the 
full-table-scan fallback is
+   * disabled: some files referenced by the window have been removed (e.g. by 
cleaning), so there is
+   * no correct incremental result to produce. Listing from the recorded files 
(which include the
+   * missing paths) preserves the prior fail-early contract - the scan 
surfaces a file-not-found
+   * error pointing the user at 
`hoodie.datasource.read.incr.fallback.fulltablescan.enable` - instead
+   * of silently returning an empty/partial result from a fresh listing that 
no longer sees those
+   * files.
+   */
+  private def legacyAffectedFileSlices(partitionPaths: Seq[String], 
latestCommit: String): Seq[FileSlice] = {

Review Comment:
   `legacyAffectedFileSlices` and `collectIncrementalFileSlices` share the same 
`partitionPaths.flatMap { p => view.getLatestMergedFileSlicesBeforeOrOn(p, 
latestCommit).iterator().asScala [.filter(...)] }` body wrapped in `try/finally 
view.close()`, differing only in how the view is built and whether the 
file-group filter applies. Consider a private helper taking the view plus a 
`FileSlice => Boolean` predicate that owns the `try/finally close`, so the two 
call sites cannot drift on the close() contract.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
 
   override def shouldIncludeLogFiles(): Boolean = fullTableScan
 
+  /**
+   * Collects the incremental file slices for the given modified partitions.
+   *
+   * Each returned file slice carries its base file plus all of its log files 
so the
+   * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a 
correct runtime merge.
+   * Resolving the current value of a record changed in the incremental window 
requires the full file
+   * slice, not just the files written within the window:
+   *  - EVENT_TIME_ORDERING: a record written in the window may carry a lower 
ordering value than the
+   *    version already in the base/earlier-log, so the existing version wins; 
a window-only view
+   *    cannot determine the winner.
+   *  - Partial updates: a window log block holds only the changed columns, so 
the unchanged columns
+   *    must be filled in from the base file.
+   * (HUDI #18943.)
+   *
+   * The view is built from the (modified) partition listing (which includes 
the latest base file of
+   * each file group, honoring the metadata table when enabled) and scoped 
back to the file groups
+   * actually touched in the window (see [[affectedFileGroupIds]]) so 
untouched file groups are not
+   * read. The commit-time record filter applied during the scan still 
restricts the returned records
+   * to the incremental window.
+   *
+   * The view timeline is bounded to instants at or before `latestCommit` (the 
window's last commit)
+   * so that base/log files written by later commits are not visible. Without 
this bound a record
+   * updated again after the window would be merged with those later log files 
and its merged commit
+   * time would fall outside the window, dropping the in-window change from 
the result.
+   */

Review Comment:
   This `affectedFileGroupIds` filter is the only thing scoping the 
metadata-aware partition view back to the touched file groups, but no test 
covers a modified partition that holds an untouched sibling file group. 
`testPartialUpdateIncrementalQueryPartitioned` writes one updated record per 
partition, so with `MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT=0` each partition 
is a single file group and this filter is a no-op - removing it would fail no 
test. Suggest a case that lands multiple file groups in one partition and 
updates only one, asserting the untouched sibling is excluded. Output stays 
correct via the post-merge commit-time filter either way, so this guards the 
scoping/perf path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to