hudi-agent commented on code in PR #19005:
URL: https://github.com/apache/hudi/pull/19005#discussion_r3423027372
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -194,8 +270,7 @@ trait HoodieIncrementalRelationV2Trait extends
HoodieBaseRelation {
val fallbackToFullTableScan =
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean
- fallbackToFullTableScan && (startInstantArchived
- || affectedFilesInCommits.asScala.exists(fileStatus =>
!metaClient.getStorage.exists(fileStatus.getPath)))
+ fallbackToFullTableScan && (startInstantArchived ||
hasMissingAffectedFiles)
Review Comment:
🤖 With `fallbackToFullTableScan=false`, `hasMissingAffectedFiles` is now
also evaluated in `collectFileSplits`/`listFileSplits` to choose between the
new and legacy paths, which triggers an `exists()` per affected file at plan
time even when before it was short-circuited. Could be slow for wide windows on
object stores (one head request per file). Lazy val avoids repeat eval, so
probably fine, but worth confirming this matches the intent.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -353,6 +388,48 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
}
}
+ /**
+ * The incremental window end (latest commit time in the window), extracted
from the `In` filter on
+ * the commit-time metadata field in `requiredFilters`. Used to bound the
merge inputs so that log
+ * blocks committed after the window are not merged. Returns None when no
such filter is present.
+ */
+ private def incrementalWindowEnd(requiredFilters: Seq[Filter]):
Option[String] = {
+ val commitTimes: Seq[String] = requiredFilters.collect {
+ case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD
=>
+ values.filter(_ != null).map(_.toString).toSeq
+ }.flatten
+ if (commitTimes.isEmpty) None else Some(commitTimes.max)
+ }
+
+ /**
+ * Builds a predicate over merged rows that keeps only the records whose
`_hoodie_commit_time`
+ * falls within the incremental query window encoded by `requiredFilters`
(an `In` filter on the
+ * commit-time metadata field, produced by the incremental relations).
Returns None when no such
+ * filter can be built (e.g. the commit-time field is not part of the read
schema), in which case
+ * the caller keeps the original read-time filter pushdown.
+ */
+ private def buildCommitTimeRowFilter(requiredFilters: Seq[Filter],
+ readSchema: StructType):
Option[JPredicate[InternalRow]] = {
+ if (requiredFilters.isEmpty) {
+ None
+ } else {
+
readSchema.getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD).flatMap { idx
=>
+ val allowedCommitTimes: Set[String] = requiredFilters.collect {
+ case In(attr, values) if attr ==
HoodieRecord.COMMIT_TIME_METADATA_FIELD =>
+ values.filter(_ != null).map(_.toString).toSet
+ }.flatten.toSet
+ if (allowedCommitTimes.isEmpty) {
+ None
+ } else {
+ Some(new JPredicate[InternalRow] {
+ override def test(row: InternalRow): Boolean =
+ !row.isNullAt(idx) &&
allowedCommitTimes.contains(row.getUTF8String(idx).toString)
Review Comment:
🤖 This `row.getUTF8String(idx).toString` allocates a Java `String` for every
merged row in the file group. Would it be worth pre-converting
`allowedCommitTimes` to `Set[UTF8String]` once and comparing `UTF8String`
directly to skip the per-row allocation? Could matter on wide incremental
windows.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -353,6 +388,48 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
}
}
+ /**
+ * The incremental window end (latest commit time in the window), extracted
from the `In` filter on
+ * the commit-time metadata field in `requiredFilters`. Used to bound the
merge inputs so that log
+ * blocks committed after the window are not merged. Returns None when no
such filter is present.
+ */
+ private def incrementalWindowEnd(requiredFilters: Seq[Filter]):
Option[String] = {
+ val commitTimes: Seq[String] = requiredFilters.collect {
+ case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD
=>
+ values.filter(_ != null).map(_.toString).toSeq
+ }.flatten
+ if (commitTimes.isEmpty) None else Some(commitTimes.max)
+ }
+
+ /**
+ * Builds a predicate over merged rows that keeps only the records whose
`_hoodie_commit_time`
+ * falls within the incremental query window encoded by `requiredFilters`
(an `In` filter on the
+ * commit-time metadata field, produced by the incremental relations).
Returns None when no such
+ * filter can be built (e.g. the commit-time field is not part of the read
schema), in which case
+ * the caller keeps the original read-time filter pushdown.
+ */
+ private def buildCommitTimeRowFilter(requiredFilters: Seq[Filter],
+ readSchema: StructType):
Option[JPredicate[InternalRow]] = {
+ if (requiredFilters.isEmpty) {
+ None
+ } else {
+
readSchema.getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD).flatMap { idx
=>
+ val allowedCommitTimes: Set[String] = requiredFilters.collect {
Review Comment:
🤖 nit: the `case In(attr, values) if attr ==
HoodieRecord.COMMIT_TIME_METADATA_FIELD => values.filter(_ !=
null).map(_.toString)` pattern is extracted verbatim in both
`incrementalWindowEnd` (line 397) and here. Have you considered pulling it into
a small private helper like `commitTimesFromFilters(filters: Seq[Filter]):
Set[String]` to keep the two callers in sync if the filter shape ever changes?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
override def shouldIncludeLogFiles(): Boolean = fullTableScan
+ /**
+ * Collects the incremental file slices for the given modified partitions.
+ *
+ * Each returned file slice carries its base file plus all of its log files
so the
+ * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a
correct runtime merge.
+ * Resolving the current value of a record changed in the incremental window
requires the full file
+ * slice, not just the files written within the window:
+ * - EVENT_TIME_ORDERING: a record written in the window may carry a lower
ordering value than the
+ * version already in the base/earlier-log, so the existing version wins;
a window-only view
+ * cannot determine the winner.
+ * - Partial updates: a window log block holds only the changed columns, so
the unchanged columns
+ * must be filled in from the base file.
+ * (HUDI #18943.)
+ *
+ * The view is built from the (modified) partition listing (which includes
the latest base file of
+ * each file group, honoring the metadata table when enabled) and scoped
back to the file groups
+ * actually touched in the window (see [[affectedFileGroupIds]]) so
untouched file groups are not
+ * read. The commit-time record filter applied during the scan still
restricts the returned records
+ * to the incremental window.
+ *
+ * The view timeline is bounded to instants at or before `latestCommit` (the
window's last commit)
+ * so that base/log files written by later commits are not visible. Without
this bound a record
+ * updated again after the window would be merged with those later log files
and its merged commit
+ * time would fall outside the window, dropping the in-window change from
the result.
+ */
+ private def collectIncrementalFileSlices(partitionPaths: Seq[String],
latestCommit: String): Seq[FileSlice] = {
+ val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sqlContext.sparkContext))
+ val fsView =
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(
+ engineContext, metaClient, fileIndex.metadataConfig,
timeline.findInstantsBeforeOrEquals(latestCommit))
+ try {
+ partitionPaths.flatMap { relativePartitionPath =>
+ fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
Review Comment:
🤖 nit: `fs` conventionally means `FileSystem`/storage throughout Hudi —
using it for a `FileSlice` lambda here could trip up a reader. Could you rename
it to `slice` or `fileSlice`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
override def shouldIncludeLogFiles(): Boolean = fullTableScan
+ /**
+ * Collects the incremental file slices for the given modified partitions.
+ *
+ * Each returned file slice carries its base file plus all of its log files
so the
+ * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a
correct runtime merge.
+ * Resolving the current value of a record changed in the incremental window
requires the full file
+ * slice, not just the files written within the window:
+ * - EVENT_TIME_ORDERING: a record written in the window may carry a lower
ordering value than the
+ * version already in the base/earlier-log, so the existing version wins;
a window-only view
+ * cannot determine the winner.
+ * - Partial updates: a window log block holds only the changed columns, so
the unchanged columns
+ * must be filled in from the base file.
+ * (HUDI #18943.)
+ *
+ * The view is built from the (modified) partition listing (which includes
the latest base file of
+ * each file group, honoring the metadata table when enabled) and scoped
back to the file groups
+ * actually touched in the window (see [[affectedFileGroupIds]]) so
untouched file groups are not
+ * read. The commit-time record filter applied during the scan still
restricts the returned records
+ * to the incremental window.
+ *
+ * The view timeline is bounded to instants at or before `latestCommit` (the
window's last commit)
+ * so that base/log files written by later commits are not visible. Without
this bound a record
+ * updated again after the window would be merged with those later log files
and its merged commit
+ * time would fall outside the window, dropping the in-window change from
the result.
+ */
+ private def collectIncrementalFileSlices(partitionPaths: Seq[String],
latestCommit: String): Seq[FileSlice] = {
+ val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sqlContext.sparkContext))
+ val fsView =
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(
+ engineContext, metaClient, fileIndex.metadataConfig,
timeline.findInstantsBeforeOrEquals(latestCommit))
+ try {
+ partitionPaths.flatMap { relativePartitionPath =>
+ fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
+ .filter(fs => affectedFileGroupIds.contains(fs.getFileId))
+ }
+ } finally {
+ fsView.close()
+ }
+ }
Review Comment:
🤖 Just to double-check the scope: the PR description says only the V2
listing changed, but `HoodieFileGroupReaderBasedFileFormat` is the read path
for both V1 and V2. V1's listing still builds the view from
`affectedFilesInCommits` only (no pre-window base file), so the runtime merge
sees only log records and the partial-update / EVENT_TIME_ORDERING bugs remain
on V1. Is that intentional (V1 = table-version < 8, deprecated path), or should
V1 listing get the same treatment in a follow-up?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]