hudi-agent commented on code in PR #19005:
URL: https://github.com/apache/hudi/pull/19005#discussion_r3423027372


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -194,8 +270,7 @@ trait HoodieIncrementalRelationV2Trait extends 
HoodieBaseRelation {
     val fallbackToFullTableScan = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key,
       
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean
 
-    fallbackToFullTableScan && (startInstantArchived
-      || affectedFilesInCommits.asScala.exists(fileStatus => 
!metaClient.getStorage.exists(fileStatus.getPath)))
+    fallbackToFullTableScan && (startInstantArchived || 
hasMissingAffectedFiles)

Review Comment:
   🤖 With `fallbackToFullTableScan=false`, `hasMissingAffectedFiles` is now 
also evaluated in `collectFileSplits`/`listFileSplits` to choose between the 
new and legacy paths, which triggers an `exists()` per affected file at plan 
time even when before it was short-circuited. Could be slow for wide windows on 
object stores (one head request per file). Lazy val avoids repeat eval, so 
probably fine, but worth confirming this matches the intent.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -353,6 +388,48 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     }
   }
 
+  /**
+   * The incremental window end (latest commit time in the window), extracted 
from the `In` filter on
+   * the commit-time metadata field in `requiredFilters`. Used to bound the 
merge inputs so that log
+   * blocks committed after the window are not merged. Returns None when no 
such filter is present.
+   */
+  private def incrementalWindowEnd(requiredFilters: Seq[Filter]): 
Option[String] = {
+    val commitTimes: Seq[String] = requiredFilters.collect {
+      case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD 
=>
+        values.filter(_ != null).map(_.toString).toSeq
+    }.flatten
+    if (commitTimes.isEmpty) None else Some(commitTimes.max)
+  }
+
+  /**
+   * Builds a predicate over merged rows that keeps only the records whose 
`_hoodie_commit_time`
+   * falls within the incremental query window encoded by `requiredFilters` 
(an `In` filter on the
+   * commit-time metadata field, produced by the incremental relations). 
Returns None when no such
+   * filter can be built (e.g. the commit-time field is not part of the read 
schema), in which case
+   * the caller keeps the original read-time filter pushdown.
+   */
+  private def buildCommitTimeRowFilter(requiredFilters: Seq[Filter],
+                                       readSchema: StructType): 
Option[JPredicate[InternalRow]] = {
+    if (requiredFilters.isEmpty) {
+      None
+    } else {
+      
readSchema.getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD).flatMap { idx 
=>
+        val allowedCommitTimes: Set[String] = requiredFilters.collect {
+          case In(attr, values) if attr == 
HoodieRecord.COMMIT_TIME_METADATA_FIELD =>
+            values.filter(_ != null).map(_.toString).toSet
+        }.flatten.toSet
+        if (allowedCommitTimes.isEmpty) {
+          None
+        } else {
+          Some(new JPredicate[InternalRow] {
+            override def test(row: InternalRow): Boolean =
+              !row.isNullAt(idx) && 
allowedCommitTimes.contains(row.getUTF8String(idx).toString)

Review Comment:
   🤖 This `row.getUTF8String(idx).toString` allocates a Java `String` for every 
merged row in the file group. Would it be worth pre-converting 
`allowedCommitTimes` to `Set[UTF8String]` once and comparing `UTF8String` 
directly to skip the per-row allocation? Could matter on wide incremental 
windows.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -353,6 +388,48 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     }
   }
 
+  /**
+   * The incremental window end (latest commit time in the window), extracted 
from the `In` filter on
+   * the commit-time metadata field in `requiredFilters`. Used to bound the 
merge inputs so that log
+   * blocks committed after the window are not merged. Returns None when no 
such filter is present.
+   */
+  private def incrementalWindowEnd(requiredFilters: Seq[Filter]): 
Option[String] = {
+    val commitTimes: Seq[String] = requiredFilters.collect {
+      case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD 
=>
+        values.filter(_ != null).map(_.toString).toSeq
+    }.flatten
+    if (commitTimes.isEmpty) None else Some(commitTimes.max)
+  }
+
+  /**
+   * Builds a predicate over merged rows that keeps only the records whose 
`_hoodie_commit_time`
+   * falls within the incremental query window encoded by `requiredFilters` 
(an `In` filter on the
+   * commit-time metadata field, produced by the incremental relations). 
Returns None when no such
+   * filter can be built (e.g. the commit-time field is not part of the read 
schema), in which case
+   * the caller keeps the original read-time filter pushdown.
+   */
+  private def buildCommitTimeRowFilter(requiredFilters: Seq[Filter],
+                                       readSchema: StructType): 
Option[JPredicate[InternalRow]] = {
+    if (requiredFilters.isEmpty) {
+      None
+    } else {
+      
readSchema.getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD).flatMap { idx 
=>
+        val allowedCommitTimes: Set[String] = requiredFilters.collect {

Review Comment:
   🤖 nit: the `case In(attr, values) if attr == 
HoodieRecord.COMMIT_TIME_METADATA_FIELD => values.filter(_ != 
null).map(_.toString)` pattern is extracted verbatim in both 
`incrementalWindowEnd` (line 397) and here. Have you considered pulling it into 
a small private helper like `commitTimesFromFilters(filters: Seq[Filter]): 
Set[String]` to keep the two callers in sync if the filter shape ever changes?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
 
   override def shouldIncludeLogFiles(): Boolean = fullTableScan
 
+  /**
+   * Collects the incremental file slices for the given modified partitions.
+   *
+   * Each returned file slice carries its base file plus all of its log files 
so the
+   * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a 
correct runtime merge.
+   * Resolving the current value of a record changed in the incremental window 
requires the full file
+   * slice, not just the files written within the window:
+   *  - EVENT_TIME_ORDERING: a record written in the window may carry a lower 
ordering value than the
+   *    version already in the base/earlier-log, so the existing version wins; 
a window-only view
+   *    cannot determine the winner.
+   *  - Partial updates: a window log block holds only the changed columns, so 
the unchanged columns
+   *    must be filled in from the base file.
+   * (HUDI #18943.)
+   *
+   * The view is built from the (modified) partition listing (which includes 
the latest base file of
+   * each file group, honoring the metadata table when enabled) and scoped 
back to the file groups
+   * actually touched in the window (see [[affectedFileGroupIds]]) so 
untouched file groups are not
+   * read. The commit-time record filter applied during the scan still 
restricts the returned records
+   * to the incremental window.
+   *
+   * The view timeline is bounded to instants at or before `latestCommit` (the 
window's last commit)
+   * so that base/log files written by later commits are not visible. Without 
this bound a record
+   * updated again after the window would be merged with those later log files 
and its merged commit
+   * time would fall outside the window, dropping the in-window change from 
the result.
+   */
+  private def collectIncrementalFileSlices(partitionPaths: Seq[String], 
latestCommit: String): Seq[FileSlice] = {
+    val engineContext = new HoodieSparkEngineContext(new 
JavaSparkContext(sqlContext.sparkContext))
+    val fsView = 
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(
+      engineContext, metaClient, fileIndex.metadataConfig, 
timeline.findInstantsBeforeOrEquals(latestCommit))
+    try {
+      partitionPaths.flatMap { relativePartitionPath =>
+        fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit).iterator().asScala

Review Comment:
   🤖 nit: `fs` conventionally means `FileSystem`/storage throughout Hudi — 
using it for a `FileSlice` lambda here could trip up a reader. Could you rename 
it to `slice` or `fileSlice`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
 
   override def shouldIncludeLogFiles(): Boolean = fullTableScan
 
+  /**
+   * Collects the incremental file slices for the given modified partitions.
+   *
+   * Each returned file slice carries its base file plus all of its log files 
so the
+   * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a 
correct runtime merge.
+   * Resolving the current value of a record changed in the incremental window 
requires the full file
+   * slice, not just the files written within the window:
+   *  - EVENT_TIME_ORDERING: a record written in the window may carry a lower 
ordering value than the
+   *    version already in the base/earlier-log, so the existing version wins; 
a window-only view
+   *    cannot determine the winner.
+   *  - Partial updates: a window log block holds only the changed columns, so 
the unchanged columns
+   *    must be filled in from the base file.
+   * (HUDI #18943.)
+   *
+   * The view is built from the (modified) partition listing (which includes 
the latest base file of
+   * each file group, honoring the metadata table when enabled) and scoped 
back to the file groups
+   * actually touched in the window (see [[affectedFileGroupIds]]) so 
untouched file groups are not
+   * read. The commit-time record filter applied during the scan still 
restricts the returned records
+   * to the incremental window.
+   *
+   * The view timeline is bounded to instants at or before `latestCommit` (the 
window's last commit)
+   * so that base/log files written by later commits are not visible. Without 
this bound a record
+   * updated again after the window would be merged with those later log files 
and its merged commit
+   * time would fall outside the window, dropping the in-window change from 
the result.
+   */
+  private def collectIncrementalFileSlices(partitionPaths: Seq[String], 
latestCommit: String): Seq[FileSlice] = {
+    val engineContext = new HoodieSparkEngineContext(new 
JavaSparkContext(sqlContext.sparkContext))
+    val fsView = 
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(
+      engineContext, metaClient, fileIndex.metadataConfig, 
timeline.findInstantsBeforeOrEquals(latestCommit))
+    try {
+      partitionPaths.flatMap { relativePartitionPath =>
+        fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit).iterator().asScala
+          .filter(fs => affectedFileGroupIds.contains(fs.getFileId))
+      }
+    } finally {
+      fsView.close()
+    }
+  }

Review Comment:
   🤖 Just to double-check the scope: the PR description says only the V2 
listing changed, but `HoodieFileGroupReaderBasedFileFormat` is the read path 
for both V1 and V2. V1's listing still builds the view from 
`affectedFilesInCommits` only (no pre-window base file), so the runtime merge 
sees only log records and the partial-update / EVENT_TIME_ORDERING bugs remain 
on V1. Is that intentional (V1 = table-version < 8, deprecated path), or should 
V1 listing get the same treatment in a follow-up?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to