This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 10cf0fdf [AURON #2030] Add Native Scan Support for Apache Hudi
Copy-On-Write Tables. (#2031)
10cf0fdf is described below
commit 10cf0fdf2b1d3872b9415112e2d9c105b933d198
Author: slfan1989 <[email protected]>
AuthorDate: Fri Mar 20 10:33:34 2026 +0800
[AURON #2030] Add Native Scan Support for Apache Hudi Copy-On-Write Tables.
(#2031)
### Which issue does this PR close?
Closes #2030
### Rationale for this change
This PR adds native scan support for Hudi Copy-On-Write (COW) tables,
enabling Auron to accelerate Hudi table reads by converting
`FileSourceScanExec` operations to native Parquet/ORC scan
implementations.
### What changes are included in this PR?
#### 1. **New Module: `thirdparty/auron-hudi`**
- **`HudiConvertProvider`**: Implements `AuronConvertProvider` SPI to
intercept and convert Hudi `FileSourceScanExec` to native scans
- Detects Hudi file formats (`HoodieParquetFileFormat`,
`HoodieOrcFileFormat`)
- Converts to `NativeParquetScanExec` or `NativeOrcScanExec`
- Handles timestamp fallback logic automatically
- **`HudiScanSupport`**: Core detection and validation logic
- File format recognition with `NewHoodie*` format rejection
- Table type resolution via multi-source metadata fallback:
- Options → Catalog → `.hoodie/hoodie.properties`
- MOR table detection and rejection
- Time travel query detection (via `as.of.instant`, `as.of.timestamp`
options)
- FileIndex class hierarchy verification
#### 2. **Configuration**
- Added `spark.auron.enable.hudi.scan` config option (default: `true`)
- Respects existing Parquet/ORC timestamp scanning configurations
- Runtime Spark version validation (3.0–3.5 only)
#### 3. **Build & Integration**
- **Maven**: New profile `hudi-0.15` with enforcer rules
- Validates `hudiEnabled=true` property
- Restricts Spark to 3.0–3.5
- Pins Hudi version to 0.15.0
- **Build Script**: Enhanced `auron-build.sh`
- Added `--hudi <VERSION>` parameter
- Version compatibility validation
- Auto-enables `hudiEnabled` property
- **CI/CD**: New workflow `.github/workflows/hudi.yml`
- Matrix testing: Spark 3.0–3.5 × JDK 8/17/21 × Scala 2.12
- Independent Hudi test pipeline
### Are there any user-facing changes?
## New Configuration Option
```scala
// Enable Hudi native scan (enabled by default)
spark.conf.set("spark.auron.enable.hudi.scan", "true")
```
### How was this patch tested?
Add Junit Test.
Signed-off-by: slfan1989 <[email protected]>
---
.github/workflows/hudi.yml | 108 +++++++
auron-build.sh | 31 +-
dev/reformat | 2 +-
pom.xml | 10 +
.../configuration/SparkAuronConfiguration.java | 6 +
.../apache/spark/sql/auron/AuronConverters.scala | 5 +-
thirdparty/auron-hudi/pom.xml | 116 +++++++
...org.apache.spark.sql.auron.AuronConvertProvider | 18 ++
.../spark/sql/auron/hudi/HudiConvertProvider.scala | 87 ++++++
.../spark/sql/auron/hudi/HudiScanSupport.scala | 257 ++++++++++++++++
.../src/test/resources/log4j2.properties | 35 +++
.../sql/auron/hudi/HudiScanSupportSuite.scala | 337 +++++++++++++++++++++
12 files changed, 1009 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/hudi.yml b/.github/workflows/hudi.yml
new file mode 100644
index 00000000..48f3831d
--- /dev/null
+++ b/.github/workflows/hudi.yml
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: Hudi
+
+on:
+ workflow_dispatch:
+ push:
+ branches:
+ - master
+ - branch-*
+ pull_request:
+ branches:
+ - master
+ - branch-*
+
+concurrency:
+ group: hudi-${{ github.workflow }}-${{ github.event.pull_request.number ||
github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ test-hudi:
+ name: Test Hudi (${{ matrix.sparkver }} / JDK${{ matrix.javaver }} /
Scala${{ matrix.scalaver }})
+ runs-on: ubuntu-24.04
+ strategy:
+ fail-fast: false
+ matrix:
+ include:
+ - sparkver: "3.0"
+ scalaver: "2.12"
+ javaver: "8"
+ hudiver: "0.15"
+ - sparkver: "3.1"
+ scalaver: "2.12"
+ javaver: "8"
+ hudiver: "0.15"
+ - sparkver: "3.2"
+ scalaver: "2.12"
+ javaver: "8"
+ hudiver: "0.15"
+ - sparkver: "3.3"
+ scalaver: "2.12"
+ javaver: "8"
+ hudiver: "0.15"
+ - sparkver: "3.4"
+ scalaver: "2.12"
+ javaver: "17"
+ hudiver: "0.15"
+ - sparkver: "3.5"
+ scalaver: "2.12"
+ javaver: "17"
+ hudiver: "0.15"
+ - sparkver: "3.5"
+ scalaver: "2.12"
+ hudiver: "0.15"
+ javaver: "21"
+
+ steps:
+ - name: Checkout Auron
+ uses: actions/checkout@v6
+
+ - name: Setup Java and Maven cache
+ uses: actions/setup-java@v5
+ with:
+ distribution: 'adopt-hotspot'
+ java-version: ${{ matrix.javaver }}
+ cache: 'maven'
+
+ - name: Build dependencies (skip tests)
+ run: >
+ ./build/mvn -B install
+ -pl thirdparty/auron-hudi
+ -am
+ -Pscala-${{ matrix.scalaver }}
+ -Pspark-${{ matrix.sparkver }}
+ -Phudi-${{ matrix.hudiver }}
+ -Prelease
+ -DskipTests
+
+ - name: Test Hudi Module
+ run: >
+ ./build/mvn -B test
+ -pl thirdparty/auron-hudi
+ -Pscala-${{ matrix.scalaver }}
+ -Pspark-${{ matrix.sparkver }}
+ -Phudi-${{ matrix.hudiver }}
+ -Prelease
+
+ - name: Upload reports
+ if: failure()
+ uses: actions/upload-artifact@v6
+ with:
+ name: auron-hudi-${{ matrix.sparkver }}-hudi${{ matrix.hudiver
}}-jdk${{ matrix.javaver }}-test-report
+ path: thirdparty/auron-hudi/target/surefire-reports
diff --git a/auron-build.sh b/auron-build.sh
index a88ee697..25cce20a 100755
--- a/auron-build.sh
+++ b/auron-build.sh
@@ -38,6 +38,7 @@ SUPPORTED_UNIFFLE_VERSIONS=("0.10")
SUPPORTED_PAIMON_VERSIONS=("1.2")
SUPPORTED_FLINK_VERSIONS=("1.18")
SUPPORTED_ICEBERG_VERSIONS=("1.10.1")
+SUPPORTED_HUDI_VERSIONS=("0.15")
# -----------------------------------------------------------------------------
# Function: print_help
@@ -64,6 +65,7 @@ print_help() {
IFS=','; echo " --uniffle <VERSION> Specify Uniffle version (e.g.
${SUPPORTED_UNIFFLE_VERSIONS[*]})"; unset IFS
IFS=','; echo " --paimon <VERSION> Specify Paimon version (e.g.
${SUPPORTED_PAIMON_VERSIONS[*]})"; unset IFS
IFS=','; echo " --iceberg <VERSION> Specify Iceberg version (e.g.
${SUPPORTED_ICEBERG_VERSIONS[*]})"; unset IFS
+ IFS=','; echo " --hudi <VERSION> Specify Hudi version (e.g.
${SUPPORTED_HUDI_VERSIONS[*]})"; unset IFS
echo " -h, --help Show this help message"
echo
@@ -78,7 +80,8 @@ print_help() {
"--celeborn ${SUPPORTED_CELEBORN_VERSIONS[*]: -1}" \
"--uniffle ${SUPPORTED_UNIFFLE_VERSIONS[*]: -1}" \
"--paimon ${SUPPORTED_PAIMON_VERSIONS[*]: -1}" \
- "--iceberg ${SUPPORTED_ICEBERG_VERSIONS[*]: -1}"
+ "--iceberg ${SUPPORTED_ICEBERG_VERSIONS[*]: -1}" \
+ "--hudi ${SUPPORTED_HUDI_VERSIONS[*]: -1}"
exit 0
}
@@ -135,6 +138,7 @@ CELEBORN_VER=""
UNIFFLE_VER=""
PAIMON_VER=""
ICEBERG_VER=""
+HUDI_VER=""
# -----------------------------------------------------------------------------
# Section: Argument Parsing
@@ -301,6 +305,27 @@ while [[ $# -gt 0 ]]; do
exit 1
fi
;;
+ --hudi)
+ if [[ -n "$2" && "$2" != -* ]]; then
+ HUDI_VER="$2"
+ if ! check_supported_version "$HUDI_VER"
"${SUPPORTED_HUDI_VERSIONS[@]}"; then
+ print_invalid_option_error Hudi "$HUDI_VER"
"${SUPPORTED_HUDI_VERSIONS[@]}"
+ fi
+ if [ -z "$SPARK_VER" ]; then
+ echo "ERROR: Building hudi requires spark at the same time,
and only Spark versions 3.0 to 3.5 are supported."
+ exit 1
+ fi
+ if [ "$SPARK_VER" != "3.0" ] && [ "$SPARK_VER" != "3.1" ] && [
"$SPARK_VER" != "3.2" ] && [ "$SPARK_VER" != "3.3" ] && [ "$SPARK_VER" != "3.4"
] && [ "$SPARK_VER" != "3.5" ]; then
+ echo "ERROR: Building hudi requires spark versions are 3.0
to 3.5."
+ exit 1
+ fi
+ shift 2
+ else
+ IFS=','; echo "ERROR: Missing argument for --hudi," \
+ "specify one of: ${SUPPORTED_HUDI_VERSIONS[*]}" >&2; unset IFS
+ exit 1
+ fi
+ ;;
--flinkver)
if [[ -n "$2" && "$2" != -* ]]; then
FLINK_VER="$2"
@@ -437,6 +462,9 @@ fi
if [[ -n "$ICEBERG_VER" ]]; then
BUILD_ARGS+=("-Piceberg-$ICEBERG_VER")
fi
+if [[ -n "$HUDI_VER" ]]; then
+ BUILD_ARGS+=("-Phudi-$HUDI_VER")
+fi
# Configure Maven build threads:
# - local builds default to Maven's single-threaded behavior
@@ -473,6 +501,7 @@ get_build_info() {
"paimon.version") echo "${PAIMON_VER}" ;;
"flink.version") echo "${FLINK_VER}" ;;
"iceberg.version") echo "${ICEBERG_VER}" ;;
+ "hudi.version") echo "${HUDI_VER}" ;;
"build.timestamp") echo "$(date -u +"%Y-%m-%dT%H:%M:%SZ")" ;;
*) echo "" ;;
esac
diff --git a/dev/reformat b/dev/reformat
index b82816b6..e3f639e5 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -146,7 +146,7 @@ sparkver=spark-3.5
prepare_for_spark "${sparkver}"
for celebornver in celeborn-0.5 celeborn-0.6
do
- run_maven_format -P"${sparkver}" -Pceleborn,"${celebornver}"
-Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink-1.18 -Piceberg-1.10.1
+ run_maven_format -P"${sparkver}" -Pceleborn,"${celebornver}"
-Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink-1.18 -Piceberg-1.10.1
-Phudi-0.15
done
diff --git a/pom.xml b/pom.xml
index dce4b3cc..2c6df25e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1319,6 +1319,16 @@
</properties>
</profile>
+ <profile>
+ <id>hudi-0.15</id>
+ <modules>
+ <module>thirdparty/auron-hudi</module>
+ </modules>
+ <properties>
+ <hudiVersion>0.15.0</hudiVersion>
+ </properties>
+ </profile>
+
<profile>
<id>flink-1.18</id>
<modules>
diff --git
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index a1bf8e3c..bc46ed31 100644
---
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -327,6 +327,12 @@ public class SparkAuronConfiguration extends
AuronConfiguration {
.withDescription("Enable Iceberg scan operation conversion to
native Auron implementations.")
.withDefaultValue(true);
+ public static final ConfigOption<Boolean> ENABLE_HUDI_SCAN = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.hudi.scan")
+ .withCategory("Operator Supports")
+ .withDescription("Enable Hudi scan operation conversion to native
Auron implementations.")
+ .withDefaultValue(true);
+
public static final ConfigOption<Boolean> ENABLE_PROJECT = new
SQLConfOption<>(Boolean.class)
.withKey("auron.enable.project")
.withCategory("Operator Supports")
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 72c038a4..b3c55da5 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -161,7 +161,10 @@ object AuronConverters extends Logging {
case e: BroadcastExchangeExec if enableBroadcastExchange =>
tryConvert(e, convertBroadcastExchangeExec)
case e: FileSourceScanExec if enableScan => // scan
- tryConvert(e, convertFileSourceScanExec)
+ extConvertProviders.find(p => p.isEnabled && p.isSupported(e)) match {
+ case Some(provider) => tryConvert(e, provider.convert)
+ case None => tryConvert(e, convertFileSourceScanExec)
+ }
case e: ProjectExec if enableProject => // project
tryConvert(e, convertProjectExec)
case e: FilterExec if enableFilter => // filter
diff --git a/thirdparty/auron-hudi/pom.xml b/thirdparty/auron-hudi/pom.xml
new file mode 100644
index 00000000..4063065f
--- /dev/null
+++ b/thirdparty/auron-hudi/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.auron</groupId>
+ <artifactId>auron-parent_${scalaVersion}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>auron-hudi_${scalaVersion}</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Auron Hudi ${hudiVersion} ${scalaVersion}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.auron</groupId>
+ <artifactId>spark-extension_${scalaVersion}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+
<artifactId>hudi-spark${shortSparkVersion}-bundle_${scalaVersion}</artifactId>
+ <version>${hudiVersion}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scalaVersion}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scalaVersion}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scalaVersion}</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scalaVersion}</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scalaVersion}</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scalaVersion}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.auron</groupId>
+ <artifactId>spark-extension-shims-spark_${scalaVersion}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>${maven-enforcer-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>hudi-spark-version-compat</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireProperty>
+ <property>shortSparkVersion</property>
+ <regex>^(3\.0|3\.1|3\.2|3\.3|3\.4|3\.5)$</regex>
+ <regexMessage>Hudi integration supports Spark 3.0-3.5 only.
Current: ${shortSparkVersion}</regexMessage>
+ </requireProperty>
+ <requireProperty>
+ <property>hudiVersion</property>
+ <regex>^0\.15\.0$</regex>
+ <regexMessage>Hudi integration supports only Hudi 0.15.0.
Current: ${hudiVersion}</regexMessage>
+ </requireProperty>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/thirdparty/auron-hudi/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider
b/thirdparty/auron-hudi/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider
new file mode 100644
index 00000000..76be6809
--- /dev/null
+++
b/thirdparty/auron-hudi/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.spark.sql.auron.hudi.HudiConvertProvider
diff --git
a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
new file mode 100644
index 00000000..da96d564
--- /dev/null
+++
b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.hudi
+
+import scala.util.Try
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider,
NativeConverters, Shims}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.SparkPlan
+
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
+class HudiConvertProvider extends AuronConvertProvider with Logging {
+
+ override def isEnabled: Boolean = {
+ val sparkVersion = org.apache.spark.SPARK_VERSION
+ val versionParts = sparkVersion.split("[\\.-]", 3)
+ val maybeMajor = versionParts.headOption.flatMap(part =>
Try(part.toInt).toOption)
+ val maybeMinor =
+ if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else
None
+ val supported = (for {
+ major <- maybeMajor
+ minor <- maybeMinor
+ } yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false)
+ SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported
+ }
+
+ override def isSupported(exec: SparkPlan): Boolean = {
+ exec match {
+ case scan: FileSourceScanExec =>
+ // Only handle Hudi-backed file scans; other scans fall through.
+ HudiScanSupport.isSupported(scan)
+ case _ => false
+ }
+ }
+
+ override def convert(exec: SparkPlan): SparkPlan = {
+ exec match {
+ case scan: FileSourceScanExec if HudiScanSupport.isSupported(scan) =>
+ HudiScanSupport.fileFormat(scan) match {
+ case Some(HudiScanSupport.ParquetFormat) =>
+ if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()) {
+ return exec
+ }
+ // Hudi falls back to Spark when timestamp scanning is disabled.
+ if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get()) {
+ if (scan.requiredSchema.exists(e =>
+ NativeConverters.existTimestampType(e.dataType))) {
+ return exec
+ }
+ }
+ logDebug(s"Applying native parquet scan for Hudi:
${scan.relation.location}")
+
AuronConverters.addRenameColumnsExec(Shims.get.createNativeParquetScanExec(scan))
+ case Some(HudiScanSupport.OrcFormat) =>
+ if (!SparkAuronConfiguration.ENABLE_SCAN_ORC.get()) {
+ return exec
+ }
+ // ORC follows the same timestamp fallback rule as Parquet.
+ if (!SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get()) {
+ if (scan.requiredSchema.exists(e =>
+ NativeConverters.existTimestampType(e.dataType))) {
+ return exec
+ }
+ }
+ logDebug(s"Applying native ORC scan for Hudi:
${scan.relation.location}")
+
AuronConverters.addRenameColumnsExec(Shims.get.createNativeOrcScanExec(scan))
+ case None => exec
+ }
+ case _ => exec
+ }
+ }
+}
diff --git
a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala
b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala
new file mode 100644
index 00000000..63488d30
--- /dev/null
+++
b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.hudi
+
+import java.net.URI
+import java.util.{Locale, Properties}
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+
+object HudiScanSupport extends Logging {
+ sealed trait HudiFileFormat
+ case object ParquetFormat extends HudiFileFormat
+ case object OrcFormat extends HudiFileFormat
+
+ private val hudiParquetFileFormatSuffix = "HoodieParquetFileFormat"
+ private val newHudiParquetFileFormatSuffix = "NewHoodieParquetFileFormat"
+ private val hudiOrcFileFormatSuffix = "HoodieOrcFileFormat"
+ private val newHudiOrcFileFormatSuffix = "NewHoodieOrcFileFormat"
+ private val morTableTypes = Set("merge_on_read", "mor")
+ private val hudiTableTypeKeys = Seq(
+ "hoodie.datasource.write.table.type",
+ "hoodie.datasource.read.table.type",
+ "hoodie.table.type")
+ private val hudiBaseFileFormatKeys = Seq(
+ "hoodie.table.base.file.format",
+ "hoodie.datasource.write.base.file.format",
+ "hoodie.datasource.write.storage.type")
+
+ def fileFormat(scan: FileSourceScanExec): Option[HudiFileFormat] = {
+ val fileFormatName = scan.relation.fileFormat.getClass.getName
+ val fromClass = fileFormat(fileFormatName)
+ if (fromClass.nonEmpty) {
+ return fromClass
+ }
+ // Spark may report generic Orc/Parquet formats for Hudi; use metadata
fallback
+ // only when the underlying file index indicates a Hudi table.
+ fileFormatFromMeta(scan, catalogTable(scan.relation), fileFormatName)
+ }
+
+ private[hudi] def fileFormat(fileFormatName: String): Option[HudiFileFormat]
= {
+ logDebug(s"Hudi fileFormat resolved to: ${fileFormatName}")
+ if (fileFormatName.endsWith(newHudiParquetFileFormatSuffix) ||
+ fileFormatName.endsWith(newHudiOrcFileFormatSuffix)) {
+ return None
+ }
+ if (fileFormatName.endsWith(hudiParquetFileFormatSuffix)) {
+ return Some(ParquetFormat)
+ }
+ if (fileFormatName.endsWith(hudiOrcFileFormatSuffix)) {
+ return Some(OrcFormat)
+ }
+ None
+ }
+
+ def isSupported(scan: FileSourceScanExec): Boolean =
+ isSupported(fileFormat(scan), scan.relation.options,
catalogTable(scan.relation))
+
+ private[hudi] def isSupported(fileFormatName: String, options: Map[String,
String]): Boolean = {
+ isSupported(fileFormat(fileFormatName), options, None)
+ }
+
+ private[hudi] def isSupported(
+ fileFormat: Option[HudiFileFormat],
+ options: Map[String, String],
+ catalogTable: Option[CatalogTable]): Boolean = {
+ if (fileFormat.isEmpty) {
+ return false
+ }
+ if (hasTimeTravel(options)) {
+ return false
+ }
+
+ val tableType = tableTypeFromOptions(options)
+ .orElse(tableTypeFromCatalog(catalogTable))
+ .orElse(tableTypeFromMeta(options))
+ .map(_.toLowerCase(Locale.ROOT))
+
+ logDebug(s"Hudi tableType resolved to: ${tableType.getOrElse("unknown")}")
+
+ // Only support basic COW tables for the base version.
+ !tableType.exists(morTableTypes.contains)
+ }
+
+ private def tableTypeFromOptions(options: Map[String, String]):
Option[String] = {
+ hudiTableTypeKeys
+ .flatMap(key => options.get(key))
+ .headOption
+ }
+
+ private def baseFileFormatFromOptions(options: Map[String, String]):
Option[String] = {
+ hudiBaseFileFormatKeys
+ .flatMap(key => options.get(key))
+ .headOption
+ }
+
+ private def tableTypeFromMeta(options: Map[String, String]): Option[String]
= {
+ val basePath = options.get("path").map(normalizePath)
+ basePath.flatMap { path =>
+ try {
+ val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
+ val base = new Path(path)
+ val fs = base.getFileSystem(hadoopConf)
+ val propsPath = new Path(base, ".hoodie/hoodie.properties")
+ if (!fs.exists(propsPath)) {
+ if (log.isDebugEnabled()) {
+ logDebug(s"Hudi table properties not found at: $propsPath")
+ }
+ return None
+ }
+ val in = fs.open(propsPath)
+ try {
+ val props = new Properties()
+ props.load(in)
+ Option(props.getProperty("hoodie.table.type"))
+ } finally {
+ in.close()
+ }
+ } catch {
+ case t: Throwable =>
+ if (log.isDebugEnabled()) {
+ logDebug(s"Failed to load hudi table type from $path", t)
+ }
+ None
+ }
+ }
+ }
+
+ private def baseFileFormatFromMeta(options: Map[String, String]):
Option[String] = {
+ val basePath = options.get("path").map(normalizePath)
+ basePath.flatMap { path =>
+ try {
+ val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
+ val base = new Path(path)
+ val fs = base.getFileSystem(hadoopConf)
+ val propsPath = new Path(base, ".hoodie/hoodie.properties")
+ if (!fs.exists(propsPath)) {
+ if (log.isDebugEnabled()) {
+ logDebug(s"Hudi table properties not found at: $propsPath")
+ }
+ return None
+ }
+ val in = fs.open(propsPath)
+ try {
+ val props = new Properties()
+ props.load(in)
+ Option(props.getProperty("hoodie.table.base.file.format"))
+ } finally {
+ in.close()
+ }
+ } catch {
+ case t: Throwable =>
+ if (log.isDebugEnabled()) {
+ logDebug(s"Failed to load hudi base file format from $path", t)
+ }
+ None
+ }
+ }
+ }
+
+ private def baseFileFormatFromCatalog(catalogTable: Option[CatalogTable]):
Option[String] = {
+ catalogTable.flatMap { table =>
+ val props = table.properties ++ table.storage.properties
+ hudiBaseFileFormatKeys.flatMap(props.get).headOption
+ }
+ }
+
+ private def fileFormatFromMeta(
+ scan: FileSourceScanExec,
+ catalogTable: Option[CatalogTable],
+ fileFormatName: String): Option[HudiFileFormat] = {
+ // Avoid treating non-Hudi tables as Hudi when Spark reports generic
formats.
+ if (!isHudiFileIndex(scan.relation.location)) {
+ return None
+ }
+ val baseFormat = baseFileFormatFromOptions(scan.relation.options)
+ .orElse(baseFileFormatFromCatalog(catalogTable))
+ .orElse(baseFileFormatFromMeta(scan.relation.options))
+ .map(_.toLowerCase(Locale.ROOT))
+ baseFormat.flatMap {
+ case "orc" if fileFormatName.contains("OrcFileFormat") => Some(OrcFormat)
+ case "parquet" if fileFormatName.contains("ParquetFileFormat") =>
Some(ParquetFormat)
+ case _ => None
+ }
+ }
+
+ private def tableTypeFromCatalog(catalogTable: Option[CatalogTable]):
Option[String] = {
+ catalogTable.flatMap { table =>
+ val props = table.properties ++ table.storage.properties
+ hudiTableTypeKeys.flatMap(props.get).headOption
+ }
+ }
+
+ private def catalogTable(relation: HadoopFsRelation): Option[CatalogTable] =
{
+ val method = relation.getClass.getMethods.find(_.getName == "catalogTable")
+ method.flatMap { m =>
+ try {
+ m.invoke(relation) match {
+ case opt: Option[_] => opt.asInstanceOf[Option[CatalogTable]]
+ case table: CatalogTable => Some(table)
+ case _ => None
+ }
+ } catch {
+ case _: Throwable => None
+ }
+ }
+ }
+
+ private def isHudiFileIndex(fileIndex: AnyRef): Boolean = {
+ var current: Class[_] = fileIndex.getClass
+ while (current != null) {
+ if (current.getName.endsWith("HoodieFileIndex")) {
+ return true
+ }
+ current = current.getSuperclass
+ }
+ false
+ }
+
+ private def hasTimeTravel(options: Map[String, String]): Boolean = {
+ val keys = options.keys.map(_.toLowerCase(Locale.ROOT))
+ keys.exists {
+ case "as.of.instant" => true
+ case "as.of.timestamp" => true
+ case "hoodie.datasource.read.as.of.instant" => true
+ case "hoodie.datasource.read.as.of.timestamp" => true
+ case _ => false
+ }
+ }
+
+ private def normalizePath(rawPath: String): String = {
+ try {
+ val uri = new URI(rawPath)
+ if (uri.getScheme == null) rawPath else uri.toString
+ } catch {
+ case _: Throwable => rawPath
+ }
+ }
+}
diff --git a/thirdparty/auron-hudi/src/test/resources/log4j2.properties
b/thirdparty/auron-hudi/src/test/resources/log4j2.properties
new file mode 100644
index 00000000..276d5511
--- /dev/null
+++ b/thirdparty/auron-hudi/src/test/resources/log4j2.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+rootLogger.level = info
+rootLogger.appenderRef.file.ref = File
+
+#File Appender
+appender.file.type = File
+appender.file.name = File
+appender.file.fileName = target/auron-hudi-tests.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex
+
+#Console Appender
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.target = SYSTEM_OUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} %p %c:
%maxLen{%m}{512}%n%ex{8}%n
+appender.console.filter.threshold.type = ThresholdFilter
+appender.console.filter.threshold.level = warn
diff --git
a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala
b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala
new file mode 100644
index 00000000..a388507c
--- /dev/null
+++
b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.hudi
+
+import java.io.File
+import java.io.FileInputStream
+import java.nio.file.Files
+import java.util.Properties
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.auron.plan.NativeOrcScanBase
+import org.apache.spark.sql.execution.auron.plan.NativeParquetScanBase
+import org.apache.spark.sql.test.SharedSparkSession
+
+import org.apache.auron.util.SparkVersionUtil
+
+class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
+
+ private lazy val suiteWorkspace: File = {
+ val base = new File("target/tmp")
+ base.mkdirs()
+ Files.createTempDirectory(base.toPath, "auron-hudi-tests").toFile
+ }
+ private lazy val warehouseDir: String =
+ new File(suiteWorkspace, "spark-warehouse").getAbsolutePath
+
+ private lazy val hudiCatalogClassAvailable: Boolean = {
+ try {
+ Class.forName(
+ "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
+ false,
+ Thread.currentThread().getContextClassLoader)
+ true
+ } catch {
+ case _: ClassNotFoundException => false
+ }
+ }
+
+ override protected def sparkConf: SparkConf = {
+ if (!suiteWorkspace.exists()) {
+ suiteWorkspace.mkdirs()
+ }
+ new File(warehouseDir).mkdirs()
+ val extraJavaOptions =
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
+ "--add-opens=java.base/java.io=ALL-UNNAMED " +
+ "--add-opens=java.base/java.net=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED " +
+ "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.security.action=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.security.tools.keytool=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.security.x509=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED " +
+ "-Djdk.reflect.useDirectMethodHandle=false " +
+ "-Dio.netty.tryReflectionSetAccessible=true"
+ val conf = super.sparkConf
+ .set(
+ "spark.sql.extensions",
+ "org.apache.spark.sql.auron.AuronSparkSessionExtension," +
+ "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+ .set(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
+ .set("spark.auron.enable", "true")
+ .set("spark.sql.warehouse.dir", warehouseDir)
+ .set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.driver.extraJavaOptions", extraJavaOptions)
+ .set("spark.executor.extraJavaOptions", extraJavaOptions)
+ // Disable native timestamp scan to validate fallback behavior in tests.
+ .set("spark.auron.enable.scan.parquet.timestamp", "false")
+ .set("spark.auron.ui.enabled", "false")
+ .set("spark.ui.enabled", "false")
+ if (hudiCatalogClassAvailable) {
+ conf.set(
+ "spark.sql.catalog.spark_catalog",
+ "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ } else {
+ info(
+ "Hudi HoodieCatalog not found on the classpath; leaving
spark.sql.catalog.spark_catalog as default.")
+ }
+ conf
+ }
+
+ private def withTable(tableName: String)(f: => Unit): Unit = {
+ try {
+ f
+ } finally {
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ }
+ }
+
+ private def assertHasNativeParquetScan(plan:
org.apache.spark.sql.execution.SparkPlan): Unit = {
+ assert(plan.find(_.isInstanceOf[NativeParquetScanBase]).nonEmpty)
+ }
+
+ private def assertHasNativeOrcScan(plan:
org.apache.spark.sql.execution.SparkPlan): Unit = {
+ assert(plan.find(_.isInstanceOf[NativeOrcScanBase]).nonEmpty)
+ }
+
+ private def assertNoNativeParquetScan(df: org.apache.spark.sql.DataFrame):
Unit = {
+ assert(df.queryExecution.executedPlan.collect { case _:
NativeParquetScanBase =>
+ true
+ }.isEmpty)
+ }
+
+ private def logFileFormats(df: org.apache.spark.sql.DataFrame): Unit = {
+ val plan = materializedPlan(df)
+ val nodes = plan.collect { case p => p }
+ val scans = plan.collect { case scan:
org.apache.spark.sql.execution.FileSourceScanExec =>
+ scan
+ }
+ val formats = scans.map(_.relation.fileFormat.getClass.getName)
+ val nativeScans = plan.collect {
+ case scan: NativeParquetScanBase => scan.nodeName
+ case scan: NativeOrcScanBase => scan.nodeName
+ }
+ val nativeScanNames = nodes
+ .map(_.nodeName)
+ .filter(name => name.contains("NativeParquetScan") ||
name.contains("NativeOrcScan"))
+ if (formats.nonEmpty) {
+ info(s"Detected file formats: ${formats.distinct.mkString(", ")}")
+ scans.foreach { scan =>
+ info(
+ s"Scan requiredSchema: ${scan.requiredSchema.simpleString}, " +
+ s"options: ${scan.relation.options}")
+ }
+ }
+ if (nativeScans.nonEmpty) {
+ info(s"Detected native scans: ${nativeScans.distinct.mkString(", ")}")
+ }
+ if (nativeScanNames.nonEmpty && nativeScans.isEmpty) {
+ info(s"Detected native scans (by nodeName):
${nativeScanNames.distinct.mkString(", ")}")
+ }
+ if (formats.isEmpty && nativeScans.isEmpty) {
+ info(s"No FileSourceScanExec/Native scan found. Plan:
${plan.simpleString(2)}")
+ }
+ }
+
+ private def materializedPlan(
+ df: org.apache.spark.sql.DataFrame):
org.apache.spark.sql.execution.SparkPlan = {
+ // Ensure we inspect the post-AQE plan when adaptive execution is enabled.
+ df.queryExecution.executedPlan match {
+ case adaptive:
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec =>
+ adaptive.executedPlan
+ case other => other
+ }
+ }
+
+ private def hudiBaseFileFormat(tableName: String): Option[String] = {
+ // Read the base file format from Hudi table properties for assertions.
+ val propsFile = new File(new File(warehouseDir, tableName),
".hoodie/hoodie.properties")
+ if (!propsFile.exists()) {
+ return None
+ }
+ val props = new Properties()
+ val in = new FileInputStream(propsFile)
+ try {
+ props.load(in)
+ } finally {
+ in.close()
+ }
+ Option(props.getProperty("hoodie.table.base.file.format"))
+ }
+
+ private def assumeSparkAtLeast(version: String): Unit = {
+ val current = SparkVersionUtil.SPARK_RUNTIME_VERSION
+ assume(current >= version, s"Requires Spark >= $version, current Spark
$current")
+ }
+
+ test("hudi fileFormat detects parquet and orc classes") {
+ assert(
+ HudiScanSupport
+
.fileFormat("org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat")
+ .contains(HudiScanSupport.ParquetFormat))
+ assert(HudiScanSupport
+ .fileFormat(
+
"org.apache.spark.sql.execution.datasources.parquet.Spark35LegacyHoodieParquetFileFormat")
+ .contains(HudiScanSupport.ParquetFormat))
+ assert(
+ HudiScanSupport
+
.fileFormat("org.apache.spark.sql.execution.datasources.orc.HoodieOrcFileFormat")
+ .contains(HudiScanSupport.OrcFormat))
+ }
+
+ test("hudi fileFormat rejects NewHoodie formats") {
+ assert(
+ HudiScanSupport
+ .fileFormat(
+
"org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat")
+ .isEmpty)
+ assert(
+ HudiScanSupport
+
.fileFormat("org.apache.spark.sql.execution.datasources.orc.NewHoodieOrcFileFormat")
+ .isEmpty)
+ }
+
+ test("hudi isSupported rejects MOR table types") {
+ val options = Map("hoodie.datasource.write.table.type" -> "MERGE_ON_READ")
+ assert(
+ !HudiScanSupport.isSupported(
+
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
+ options))
+ }
+
+ test("hudi isSupported allows default COW") {
+ assert(
+ HudiScanSupport.isSupported(
+
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
+ Map.empty))
+ }
+
+ test("hudi isSupported rejects non-Hudi formats") {
+ assert(
+ !HudiScanSupport.isSupported(
+ "org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat",
+ Map.empty))
+ }
+
+ test("hudi: time travel falls back to Spark") {
+ assumeSparkAtLeast("3.2")
+ withTable("hudi_tm") {
+ spark.sql("create table hudi_tm (id int, name string) using hudi")
+ spark.sql("insert into hudi_tm values (1, 'v1'), (2, 'v2')")
+ spark.sql("insert into hudi_tm values (3, 'v3'), (4, 'v4')")
+ val value = spark
+ .sql("select _hoodie_commit_time from hudi_tm")
+ .collectAsList()
+ .get(0)
+ .getAs[String](0)
+
+ val df1 = spark.sql(s"select id, name from hudi_tm timestamp AS OF
$value")
+ // Time travel uses metadata and should stay on Spark path.
+ logFileFormats(df1)
+ val rows1 = df1.collect().toSeq
+ assert(rows1 == Seq(Row(1, "v1"), Row(2, "v2")))
+ assertNoNativeParquetScan(df1)
+
+ val df2 = spark.sql(s"select name from hudi_tm timestamp AS OF $value
where id = 2")
+ // Filters shouldn't affect fallback for time travel queries.
+ logFileFormats(df2)
+ val rows2 = df2.collect().toSeq
+ assert(rows2 == Seq(Row("v2")))
+ assertNoNativeParquetScan(df2)
+ }
+ }
+
+ test("hudi: timestamp column falls back when native timestamp disabled") {
+ withTable("hudi_ts") {
+ spark.sql("create table hudi_ts (id int, ts timestamp) using hudi")
+ spark.sql("insert into hudi_ts values (1, timestamp('2026-01-01
00:00:00'))")
+ val df = spark.sql("select * from hudi_ts")
+ // Timestamp columns are not supported when native timestamp scanning is
disabled.
+ logFileFormats(df)
+ df.collect()
+ assert(df.queryExecution.executedPlan.collect { case _:
NativeParquetScanBase =>
+ true
+ }.isEmpty)
+ }
+ }
+
+ test("hudi: ORC table scan converts to native (provider)") {
+ withTable("hudi_orc") {
+ spark.sql("""create table hudi_orc (id int, name string)
+ |using hudi
+ |tblproperties (
+ | 'hoodie.datasource.write.table.type' = 'cow',
+ | 'hoodie.datasource.write.storage.type' = 'ORC',
+ | 'hoodie.datasource.write.base.file.format' = 'ORC',
+ | 'hoodie.table.base.file.format' = 'ORC'
+ |)""".stripMargin)
+ spark.sql("insert into hudi_orc values (1, 'v1'), (2, 'v2')")
+ val baseFormat = hudiBaseFileFormat("hudi_orc").getOrElse("unknown")
+ info(s"Hudi base file format: $baseFormat")
+ assume(
+ baseFormat.equalsIgnoreCase("orc"),
+ s"Expected ORC base file format but found: $baseFormat")
+ val df = spark.sql("select id, name from hudi_orc where id = 2")
+ // Validate provider conversion even if Spark reports generic
OrcFileFormat.
+ logFileFormats(df)
+ val rows = df.collect().toSeq
+ assert(rows == Seq(Row(2, "v2")))
+ val scan = df.queryExecution.sparkPlan.collectFirst {
+ case s: org.apache.spark.sql.execution.FileSourceScanExec => s
+ }
+ assert(scan.isDefined)
+ val provider = new HudiConvertProvider
+ assert(provider.isSupported(scan.get))
+ val converted = provider.convert(scan.get)
+ assertHasNativeOrcScan(converted)
+ }
+ }
+
+ test("hudi: Parquet table scan converts to native (provider)") {
+ withTable("hudi_native_simple") {
+ spark.sql("create table hudi_native_simple (id int, name string) using
hudi")
+ spark.sql("insert into hudi_native_simple values (1, 'v1'), (2, 'v2')")
+ val df = spark.sql("select id, name from hudi_native_simple order by id")
+ df.explain(true)
+ // Validate provider conversion and correctness for the common COW
parquet path.
+ logFileFormats(df)
+ val rows = df.collect().toSeq
+ assert(rows == Seq(Row(1, "v1"), Row(2, "v2")))
+ val scan = df.queryExecution.sparkPlan.collectFirst {
+ case s: org.apache.spark.sql.execution.FileSourceScanExec => s
+ }
+ assert(scan.isDefined)
+ val provider = new HudiConvertProvider
+ assert(provider.isSupported(scan.get))
+ val converted = provider.convert(scan.get)
+ assertHasNativeParquetScan(converted)
+ }
+ }
+}