This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d6482e1b084 branch-3.0: [fix](hudi) upgrade hudi to 0.15.0 
(#44267)(#44995) (#45054)
d6482e1b084 is described below

commit d6482e1b084c4fbd9d31f2347c7f3c282f9c6900
Author: Socrates <suyit...@selectdb.com>
AuthorDate: Fri Dec 6 01:48:03 2024 +0800

    branch-3.0: [fix](hudi) upgrade hudi to 0.15.0 (#44267)(#44995) (#45054)
    
    cherry-pick (#44267)(#44995)
---
 be/src/vec/exec/format/table/hudi_jni_reader.cpp   |  14 +-
 be/src/vec/exec/format/table/hudi_jni_reader.h     |   4 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  17 +-
 build.sh                                           |   2 +
 conf/be.conf                                       |   2 +-
 fe/be-java-extensions/hadoop-hudi-scanner/pom.xml  | 227 +++++++++++++++++
 .../apache/doris/hudi/HadoopHudiColumnValue.java   | 219 +++++++++++++++++
 .../apache/doris/hudi/HadoopHudiJniScanner.java    | 271 +++++++++++++++++++++
 .../src/main/resources/package.xml                 |  41 ++++
 .../src/main/java/org/apache/doris/hudi/Utils.java |   4 +-
 .../org/apache/doris/hudi/BaseSplitReader.scala    |  15 +-
 fe/be-java-extensions/pom.xml                      |   1 +
 .../apache/doris/datasource/FileQueryScanNode.java |   2 +
 .../datasource/hive/HiveMetaStoreClientHelper.java |   5 +-
 .../apache/doris/datasource/hudi/HudiUtils.java    |   2 +-
 .../hudi/source/COWIncrementalRelation.java        |  11 +-
 .../hudi/source/HudiLocalEngineContext.java        |  67 ++---
 .../hudi/source/HudiPartitionProcessor.java        |  14 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |  71 ++++--
 .../doris/datasource/hudi/source/HudiSplit.java    |   3 +-
 .../hudi/source/MORIncrementalRelation.java        |  14 +-
 .../datasource/paimon/source/PaimonScanNode.java   |  39 ++-
 .../glue/translator/PhysicalPlanTranslator.java    |   2 +-
 .../apache/doris/planner/SingleNodePlanner.java    |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  14 ++
 fe/pom.xml                                         |   6 +-
 gensrc/thrift/PlanNodes.thrift                     |   4 +-
 .../hudi/test_hudi_incremental.out                 | 174 +++++++++++++
 .../hudi/test_hudi_orc_tables.out                  |  15 ++
 .../hudi/test_hudi_schema_evolution.out            |  32 +++
 .../external_table_p2/hudi/test_hudi_snapshot.out  | Bin 348526 -> 696105 bytes
 .../external_table_p2/hudi/test_hudi_timestamp.out |  31 ++-
 .../hudi/test_hudi_timetravel.out                  | 120 +++++++++
 .../hudi/test_hudi_catalog.groovy                  |   2 +-
 .../hudi/test_hudi_incremental.groovy              |  16 +-
 ..._catalog.groovy => test_hudi_orc_tables.groovy} |  10 +-
 .../hudi/test_hudi_schema_evolution.groovy         |  14 +-
 .../hudi/test_hudi_snapshot.groovy                 |  13 +-
 .../hudi/test_hudi_timestamp.groovy                |  20 +-
 .../hudi/test_hudi_timetravel.groovy               |  15 +-
 40 files changed, 1386 insertions(+), 149 deletions(-)

diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp 
b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
index 33ba92b540a..cb109bf05a2 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
@@ -18,7 +18,6 @@
 #include "hudi_jni_reader.h"
 
 #include <map>
-#include <ostream>
 
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
@@ -65,7 +64,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& 
scan_params,
             {"input_format", _hudi_params.input_format}};
 
     // Use compatible hadoop client to read data
-    for (auto& kv : _scan_params.properties) {
+    for (const auto& kv : _scan_params.properties) {
         if (kv.first.starts_with(HOODIE_CONF_PREFIX)) {
             params[kv.first] = kv.second;
         } else {
@@ -73,8 +72,15 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& 
scan_params,
         }
     }
 
-    _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner", params,
-                                                    required_fields);
+    if (_hudi_params.hudi_jni_scanner == "hadoop") {
+        _jni_connector = std::make_unique<JniConnector>(
+                "org/apache/doris/hudi/HadoopHudiJniScanner", params, 
required_fields);
+    } else if (_hudi_params.hudi_jni_scanner == "spark") {
+        _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner",
+                                                        params, 
required_fields);
+    } else {
+        DCHECK(false) << "Unsupported hudi jni scanner: " << 
_hudi_params.hudi_jni_scanner;
+    }
 }
 
 Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h 
b/be/src/vec/exec/format/table/hudi_jni_reader.h
index e9bb55a69a7..bfa0291a610 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.h
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.h
@@ -17,9 +17,7 @@
 
 #pragma once
 
-#include <stddef.h>
-
-#include <memory>
+#include <cstddef>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index ba8048f73a9..296e59f8df1 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -23,18 +23,15 @@
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 
-#include <algorithm>
 #include <boost/iterator/iterator_facade.hpp>
 #include <iterator>
 #include <map>
-#include <ostream>
 #include <tuple>
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/logging.h"
-#include "common/object_pool.h"
 #include "io/cache/block_file_cache_profile.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
@@ -47,7 +44,6 @@
 #include "vec/common/string_ref.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/columns_with_type_and_name.h"
-#include "vec/core/field.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_nullable.h"
@@ -720,17 +716,16 @@ Status VFileScanner::_get_next_reader() {
 
         // create reader for specific format
         Status init_status;
-        TFileFormatType::type format_type = _params->format_type;
+        // for compatibility, if format_type is not set in range, use the 
format type of params
+        TFileFormatType::type format_type =
+                range.__isset.format_type ? range.format_type : 
_params->format_type;
         // JNI reader can only push down column value range
         bool push_down_predicates =
                 !_is_load && _params->format_type != 
TFileFormatType::FORMAT_JNI;
+        // for compatibility, this logic is deprecated in 3.1
         if (format_type == TFileFormatType::FORMAT_JNI && 
range.__isset.table_format_params) {
-            if (range.table_format_params.table_format_type == "hudi" &&
-                range.table_format_params.hudi_params.delta_logs.empty()) {
-                // fall back to native reader if there is no log file
-                format_type = TFileFormatType::FORMAT_PARQUET;
-            } else if (range.table_format_params.table_format_type == "paimon" 
&&
-                       
!range.table_format_params.paimon_params.__isset.paimon_split) {
+            if (range.table_format_params.table_format_type == "paimon" &&
+                !range.table_format_params.paimon_params.__isset.paimon_split) 
{
                 // use native reader
                 auto format = 
range.table_format_params.paimon_params.file_format;
                 if (format == "orc") {
diff --git a/build.sh b/build.sh
index 6f3ddfa236f..8f1262aa76b 100755
--- a/build.sh
+++ b/build.sh
@@ -525,6 +525,7 @@ fi
 if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
     modules+=("fe-common")
     modules+=("be-java-extensions/hudi-scanner")
+    modules+=("be-java-extensions/hadoop-hudi-scanner")
     modules+=("be-java-extensions/java-common")
     modules+=("be-java-extensions/java-udf")
     modules+=("be-java-extensions/jdbc-scanner")
@@ -814,6 +815,7 @@ EOF
     extensions_modules=("java-udf")
     extensions_modules+=("jdbc-scanner")
     extensions_modules+=("hudi-scanner")
+    extensions_modules+=("hadoop-hudi-scanner")
     extensions_modules+=("paimon-scanner")
     extensions_modules+=("trino-connector-scanner")
     extensions_modules+=("max-compute-scanner")
diff --git a/conf/be.conf b/conf/be.conf
index fc89b5985b2..896c7b74f22 100644
--- a/conf/be.conf
+++ b/conf/be.conf
@@ -24,7 +24,7 @@ LOG_DIR="${DORIS_HOME}/log/"
 JAVA_OPTS="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log 
-Xloggc:$LOG_DIR/be.gc.log.$CUR_DATE -XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=50M 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
 
 # For jdk 17, this JAVA_OPTS will be used as default JVM options
-JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Xmx2048m 
-DlogPath=$LOG_DIR/jni.log 
-Xlog:gc*:$LOG_DIR/be.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives 
-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/ja 
[...]
+JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Djol.skipHotspotSAAttach=true 
-Xmx2048m -DlogPath=$LOG_DIR/jni.log 
-Xlog:gc*:$LOG_DIR/be.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives 
-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-U [...]
 
 # Set your own JAVA_HOME
 # JAVA_HOME=/path/to/jdk/
diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml 
b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
new file mode 100644
index 00000000000..4b80d49de17
--- /dev/null
+++ b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
@@ -0,0 +1,227 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <artifactId>be-java-extensions</artifactId>
+        <groupId>org.apache.doris</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>hadoop-hudi-scanner</artifactId>
+
+    <properties>
+        <doris.home>${basedir}/../../</doris.home>
+        <fe_ut_parallel>1</fe_ut_parallel>
+        <hudi.version>0.15.0</hudi.version>
+        <avro.version>1.11.3</avro.version>
+        <luben.zstd.jni.version>1.5.4-2</luben.zstd.jni.version>
+        <hive-apache.version>3.1.2-22</hive-apache.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>java-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>libthrift</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- 
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <!-- 
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aws</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-common -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-common</artifactId>
+            <version>${hudi.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>libthrift</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-io -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-io</artifactId>
+            <version>${hudi.version}</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-hadoop-mr 
-->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-hadoop-mr</artifactId>
+            <version>${hudi.version}</version>
+        </dependency>
+
+        <!-- 
https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop -->
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop-bundle</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
+
+        <!-- 
https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro -->
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>concurrent</artifactId>
+            <version>202</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/io.airlift/aircompressor -->
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>aircompressor</artifactId>
+            <version>${aircompressor.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+            <version>${luben.zstd.jni.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+            <version>4.0.2</version>
+        </dependency>
+
+        <!-- hive -->
+        <dependency>
+            <groupId>io.trino.hive</groupId>
+            <artifactId>hive-apache</artifactId>
+            <version>${hive-apache.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>libthrift</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.airlift</groupId>
+                    <artifactId>aircompressor</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>hadoop-hudi-scanner</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/resources/package.xml</descriptor>
+                    </descriptors>
+                    <archive>
+                        <manifest>
+                            <mainClass></mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java
 
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java
new file mode 100644
index 00000000000..ae0199d07d2
--- /dev/null
+++ 
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hudi;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Map;
+
+public class HadoopHudiColumnValue implements ColumnValue {
+    private ColumnType dorisType;
+    private ObjectInspector fieldInspector;
+    private Object fieldData;
+    private final ZoneId zoneId;
+
+    public HadoopHudiColumnValue(ZoneId zoneId) {
+        this.zoneId = zoneId;
+    }
+
+    public void setRow(Object record) {
+        this.fieldData = record;
+    }
+
+    public void setField(ColumnType dorisType, ObjectInspector fieldInspector) 
{
+        this.dorisType = dorisType;
+        this.fieldInspector = fieldInspector;
+    }
+
+    private Object inspectObject() {
+        return ((PrimitiveObjectInspector) 
fieldInspector).getPrimitiveJavaObject(fieldData);
+    }
+
+    @Override
+    public boolean getBoolean() {
+        return (boolean) inspectObject();
+    }
+
+    @Override
+    public short getShort() {
+        return (short) inspectObject();
+    }
+
+    @Override
+    public int getInt() {
+        return (int) inspectObject();
+    }
+
+    @Override
+    public float getFloat() {
+        return (float) inspectObject();
+    }
+
+    @Override
+    public long getLong() {
+        return (long) inspectObject();
+    }
+
+    @Override
+    public double getDouble() {
+        return (double) inspectObject();
+    }
+
+    @Override
+    public String getString() {
+        return inspectObject().toString();
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return (byte[]) inspectObject();
+    }
+
+
+    @Override
+    public byte getByte() {
+        throw new UnsupportedOperationException("Hoodie type does not support 
tinyint");
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        return ((HiveDecimal) inspectObject()).bigDecimalValue();
+    }
+
+    @Override
+    public LocalDate getDate() {
+        return LocalDate.ofEpochDay((((DateObjectInspector) 
fieldInspector).getPrimitiveJavaObject(fieldData))
+                .toEpochDay());
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        if (fieldData instanceof Timestamp) {
+            return ((Timestamp) fieldData).toLocalDateTime();
+        } else if (fieldData instanceof TimestampWritableV2) {
+            return 
LocalDateTime.ofInstant(Instant.ofEpochSecond((((TimestampObjectInspector) 
fieldInspector)
+                    .getPrimitiveJavaObject(fieldData)).toEpochSecond()), 
zoneId);
+        } else {
+            long datetime = ((LongWritable) fieldData).get();
+            long seconds;
+            long nanoseconds;
+            if (dorisType.getPrecision() == 3) {
+                seconds = datetime / 1000;
+                nanoseconds = (datetime % 1000) * 1000000;
+            } else if (dorisType.getPrecision() == 6) {
+                seconds = datetime / 1000000;
+                nanoseconds = (datetime % 1000000) * 1000;
+            } else {
+                throw new RuntimeException("Hoodie timestamp only support 
milliseconds and microseconds, "
+                        + "wrong precision = " + dorisType.getPrecision());
+            }
+            return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, 
nanoseconds), zoneId);
+        }
+    }
+
+    @Override
+    public boolean canGetStringAsBytes() {
+        return false;
+    }
+
+    @Override
+    public boolean isNull() {
+        return fieldData == null;
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        throw new UnsupportedOperationException("Hoodie type does not support 
largeint");
+    }
+
+    @Override
+    public byte[] getStringAsBytes() {
+        throw new UnsupportedOperationException("Hoodie type does not support 
getStringAsBytes");
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+        ListObjectInspector inspector = (ListObjectInspector) fieldInspector;
+        List<?> items = inspector.getList(fieldData);
+        ObjectInspector itemInspector = 
inspector.getListElementObjectInspector();
+        for (int i = 0; i < items.size(); i++) {
+            Object item = items.get(i);
+            HadoopHudiColumnValue childValue = new 
HadoopHudiColumnValue(zoneId);
+            childValue.setRow(item);
+            childValue.setField(dorisType.getChildTypes().get(0), 
itemInspector);
+            values.add(childValue);
+        }
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+        MapObjectInspector inspector = (MapObjectInspector) fieldInspector;
+        ObjectInspector keyObjectInspector = 
inspector.getMapKeyObjectInspector();
+        ObjectInspector valueObjectInspector = 
inspector.getMapValueObjectInspector();
+        for (Map.Entry kv : inspector.getMap(fieldData).entrySet()) {
+            HadoopHudiColumnValue key = new HadoopHudiColumnValue(zoneId);
+            key.setRow(kv.getKey());
+            key.setField(dorisType.getChildTypes().get(0), keyObjectInspector);
+            keys.add(key);
+
+            HadoopHudiColumnValue value = new HadoopHudiColumnValue(zoneId);
+            value.setRow(kv.getValue());
+            value.setField(dorisType.getChildTypes().get(1), 
valueObjectInspector);
+            values.add(value);
+        }
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values) {
+        StructObjectInspector inspector = (StructObjectInspector) 
fieldInspector;
+        List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+        for (int i = 0; i < structFieldIndex.size(); i++) {
+            Integer idx = structFieldIndex.get(i);
+            HadoopHudiColumnValue value = new HadoopHudiColumnValue(zoneId);
+            Object obj = null;
+            if (idx != null) {
+                StructField sf = fields.get(idx);
+                obj = inspector.getStructFieldData(fieldData, sf);
+            }
+            value.setRow(obj);
+            value.setField(dorisType.getChildTypes().get(i), 
fields.get(i).getFieldObjectInspector());
+            values.add(value);
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
 
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
new file mode 100644
index 00000000000..f2b38815a36
--- /dev/null
+++ 
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hudi;
+
+import org.apache.doris.common.classloader.ThreadClassLoaderContext;
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * HadoopHudiJniScanner is a JniScanner implementation that reads Hudi data 
using hudi-hadoop-mr.
+ */
+public class HadoopHudiJniScanner extends JniScanner {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HadoopHudiJniScanner.class);
+
+    private static final String HADOOP_CONF_PREFIX = "hadoop_conf.";
+
+    // Hudi data info
+    private final String basePath;
+    private final String dataFilePath;
+    private final long dataFileLength;
+    private final String[] deltaFilePaths;
+    private final String instantTime;
+    private final String serde;
+    private final String inputFormat;
+
+    // schema info
+    private final String hudiColumnNames;
+    private final String[] hudiColumnTypes;
+    private final String[] requiredFields;
+    private List<Integer> requiredColumnIds;
+    private ColumnType[] requiredTypes;
+
+    // Hadoop info
+    private RecordReader<NullWritable, ArrayWritable> reader;
+    private StructObjectInspector rowInspector;
+    private final ObjectInspector[] fieldInspectors;
+    private final StructField[] structFields;
+    private Deserializer deserializer;
+    private final Map<String, String> fsOptionsProps;
+
+    // scanner info
+    private final HadoopHudiColumnValue columnValue;
+    private final int fetchSize;
+    private final ClassLoader classLoader;
+
+    public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
+        this.basePath = params.get("base_path");
+        this.dataFilePath = params.get("data_file_path");
+        this.dataFileLength = Long.parseLong(params.get("data_file_length"));
+        if (Strings.isNullOrEmpty(params.get("delta_file_paths"))) {
+            this.deltaFilePaths = new String[0];
+        } else {
+            this.deltaFilePaths = params.get("delta_file_paths").split(",");
+        }
+        this.instantTime = params.get("instant_time");
+        this.serde = params.get("serde");
+        this.inputFormat = params.get("input_format");
+
+        this.hudiColumnNames = params.get("hudi_column_names");
+        this.hudiColumnTypes = params.get("hudi_column_types").split("#");
+        this.requiredFields = params.get("required_fields").split(",");
+
+        this.fieldInspectors = new ObjectInspector[requiredFields.length];
+        this.structFields = new StructField[requiredFields.length];
+        this.fsOptionsProps = Maps.newHashMap();
+        for (Map.Entry<String, String> entry : params.entrySet()) {
+            if (entry.getKey().startsWith(HADOOP_CONF_PREFIX)) {
+                
fsOptionsProps.put(entry.getKey().substring(HADOOP_CONF_PREFIX.length()), 
entry.getValue());
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("get hudi params {}: {}", entry.getKey(), 
entry.getValue());
+            }
+        }
+
+        ZoneId zoneId;
+        if (Strings.isNullOrEmpty(params.get("time_zone"))) {
+            zoneId = ZoneId.systemDefault();
+        } else {
+            zoneId = ZoneId.of(params.get("time_zone"));
+        }
+        this.columnValue = new HadoopHudiColumnValue(zoneId);
+        this.fetchSize = fetchSize;
+        this.classLoader = this.getClass().getClassLoader();
+    }
+
+    @Override
+    public void open() throws IOException {
+        try (ThreadClassLoaderContext ignored = new 
ThreadClassLoaderContext(classLoader)) {
+            initRequiredColumnsAndTypes();
+            initTableInfo(requiredTypes, requiredFields, fetchSize);
+            Properties properties = getReaderProperties();
+            initReader(properties);
+        } catch (Exception e) {
+            close();
+            LOG.warn("failed to open hadoop hudi jni scanner", e);
+            throw new IOException("failed to open hadoop hudi jni scanner: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public int getNext() throws IOException {
+        try (ThreadClassLoaderContext ignored = new 
ThreadClassLoaderContext(classLoader)) {
+            NullWritable key = reader.createKey();
+            ArrayWritable value = reader.createValue();
+            int numRows = 0;
+            for (; numRows < fetchSize; numRows++) {
+                if (!reader.next(key, value)) {
+                    break;
+                }
+                Object rowData = deserializer.deserialize(value);
+                for (int i = 0; i < fields.length; i++) {
+                    Object fieldData = 
rowInspector.getStructFieldData(rowData, structFields[i]);
+                    columnValue.setRow(fieldData);
+                    // LOG.info("rows: {}, column: {}, col name: {}, col type: 
{}, inspector: {}",
+                    //        numRows, i, types[i].getName(), 
types[i].getType().name(),
+                    //        fieldInspectors[i].getTypeName());
+                    columnValue.setField(types[i], fieldInspectors[i]);
+                    appendData(i, columnValue);
+                }
+            }
+            return numRows;
+        } catch (Exception e) {
+            close();
+            LOG.warn("failed to get next in hadoop hudi jni scanner", e);
+            throw new IOException("failed to get next in hadoop hudi jni 
scanner: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try (ThreadClassLoaderContext ignored = new 
ThreadClassLoaderContext(classLoader)) {
+            if (reader != null) {
+                reader.close();
+            }
+        } catch (IOException e) {
+            LOG.warn("failed to close hadoop hudi jni scanner", e);
+            throw new IOException("failed to close hadoop hudi jni scanner: " 
+ e.getMessage(), e);
+        }
+    }
+
+    private void initRequiredColumnsAndTypes() {
+        String[] splitHudiColumnNames = hudiColumnNames.split(",");
+
+        Map<String, Integer> hudiColNameToIdx =
+                IntStream.range(0, splitHudiColumnNames.length)
+                        .boxed()
+                        .collect(Collectors.toMap(i -> 
splitHudiColumnNames[i], i -> i));
+
+        Map<String, String> hudiColNameToType =
+                IntStream.range(0, splitHudiColumnNames.length)
+                        .boxed()
+                        .collect(Collectors.toMap(i -> 
splitHudiColumnNames[i], i -> hudiColumnTypes[i]));
+
+        requiredTypes = Arrays.stream(requiredFields)
+                .map(field -> ColumnType.parseType(field, 
hudiColNameToType.get(field)))
+                .toArray(ColumnType[]::new);
+
+        requiredColumnIds = Arrays.stream(requiredFields)
+                .mapToInt(hudiColNameToIdx::get)
+                .boxed().collect(Collectors.toList());
+    }
+
+    private Properties getReaderProperties() {
+        Properties properties = new Properties();
+        properties.setProperty("hive.io.file.readcolumn.ids", 
Joiner.on(",").join(requiredColumnIds));
+        properties.setProperty("hive.io.file.readcolumn.names", 
Joiner.on(",").join(this.requiredFields));
+        properties.setProperty("columns", this.hudiColumnNames);
+        properties.setProperty("columns.types", 
Joiner.on(",").join(hudiColumnTypes));
+        properties.setProperty("serialization.lib", this.serde);
+        properties.setProperty("hive.io.file.read.all.columns", "false");
+        fsOptionsProps.forEach(properties::setProperty);
+        return properties;
+    }
+
+    private void initReader(Properties properties) throws Exception {
+        String realtimePath = dataFileLength != -1 ? dataFilePath : 
deltaFilePaths[0];
+        long realtimeLength = dataFileLength != -1 ? dataFileLength : 0;
+        Path path = new Path(realtimePath);
+        FileSplit fileSplit = new FileSplit(path, 0, realtimeLength, 
(String[]) null);
+        List<HoodieLogFile> logFiles = 
Arrays.stream(deltaFilePaths).map(HoodieLogFile::new)
+                .collect(Collectors.toList());
+        FileSplit hudiSplit =
+                new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, 
instantTime, false, Option.empty());
+
+        JobConf jobConf = new JobConf(new Configuration());
+        properties.stringPropertyNames().forEach(name -> jobConf.set(name, 
properties.getProperty(name)));
+        InputFormat<?, ?> inputFormatClass = createInputFormat(jobConf, 
inputFormat);
+        reader = (RecordReader<NullWritable, ArrayWritable>) inputFormatClass
+                .getRecordReader(hudiSplit, jobConf, Reporter.NULL);
+
+        deserializer = getDeserializer(jobConf, properties, serde);
+        rowInspector = getTableObjectInspector(deserializer);
+        for (int i = 0; i < requiredFields.length; i++) {
+            StructField field = 
rowInspector.getStructFieldRef(requiredFields[i]);
+            structFields[i] = field;
+            fieldInspectors[i] = field.getFieldObjectInspector();
+        }
+    }
+
+    private InputFormat<?, ?> createInputFormat(Configuration conf, String 
inputFormat) throws Exception {
+        Class<?> clazz = conf.getClassByName(inputFormat);
+        Class<? extends InputFormat<?, ?>> cls =
+                (Class<? extends InputFormat<?, ?>>) 
clazz.asSubclass(InputFormat.class);
+        return ReflectionUtils.newInstance(cls, conf);
+    }
+
+    private Deserializer getDeserializer(Configuration configuration, 
Properties properties, String name)
+            throws Exception {
+        Class<? extends Deserializer> deserializerClass = Class.forName(name, 
true, JavaUtils.getClassLoader())
+                .asSubclass(Deserializer.class);
+        Deserializer deserializer = 
deserializerClass.getConstructor().newInstance();
+        deserializer.initialize(configuration, properties);
+        return deserializer;
+    }
+
+    private StructObjectInspector getTableObjectInspector(Deserializer 
deserializer) throws Exception {
+        ObjectInspector inspector = deserializer.getObjectInspector();
+        Preconditions.checkArgument(inspector.getCategory() == 
ObjectInspector.Category.STRUCT,
+                "expected STRUCT: %s", inspector.getCategory());
+        return (StructObjectInspector) inspector;
+    }
+}
diff --git 
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml 
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <id>jar-with-dependencies</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <unpackOptions>
+                <excludes>
+                    <exclude>**/Log4j2Plugins.dat</exclude>
+                </excludes>
+            </unpackOptions>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
index 5614f8bcc96..3e07c891790 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
@@ -23,6 +23,7 @@ import 
org.apache.doris.common.security.authentication.HadoopUGI;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -75,7 +76,8 @@ public class Utils {
     }
 
     public static HoodieTableMetaClient getMetaClient(Configuration conf, 
String basePath) {
+        HadoopStorageConfiguration hadoopStorageConfiguration = new 
HadoopStorageConfiguration(conf);
         return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), 
() -> HoodieTableMetaClient.builder()
-                .setConf(conf).setBasePath(basePath).build());
+                
.setConf(hadoopStorageConfiguration).setBasePath(basePath).build());
     }
 }
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index dcc068ad700..fc8d74f9713 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -36,13 +36,15 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient, T
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.hadoop.CachingPath
+import org.apache.hudi.hadoop.fs.CachingPath
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
-import org.apache.hudi.io.storage.HoodieAvroHFileReader
+import org.apache.hudi.io.hadoop.HoodieHBaseAvroHFileReader
 import org.apache.hudi.metadata.HoodieTableMetadataUtil
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieSparkConfUtils, HoodieTableSchema, 
HoodieTableState}
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.log4j.Logger
 import org.apache.spark.sql.adapter.Spark3_4Adapter
 import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters}
@@ -430,7 +432,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
     try {
       if (shouldExtractPartitionValuesFromPartitionPath) {
         val filePath = new Path(split.dataFilePath)
-        val tablePathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(tableInformation.metaClient.getBasePathV2)
+        val tablePathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(new 
Path(tableInformation.metaClient.getBasePathV2.toUri))
         val partitionPathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(filePath.getParent)
         val relativePath = new 
URI(tablePathWithoutScheme.toString).relativize(new 
URI(partitionPathWithoutScheme.toString)).toString
         val hiveStylePartitioningEnabled = 
tableConfig.getHiveStylePartitioningEnable.toBoolean
@@ -497,8 +499,11 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
                                 options: Map[String, String],
                                 hadoopConf: Configuration): PartitionedFile => 
Iterator[InternalRow] = {
     partitionedFile => {
-      val reader = new HoodieAvroHFileReader(
-        hadoopConf, partitionedFile.filePath.toPath, new 
CacheConfig(hadoopConf))
+      var hadoopStorageConfiguration = new 
HadoopStorageConfiguration(hadoopConf);
+      var storagePath = new StoragePath(partitionedFile.toPath.toUri.getPath);
+      var emptySchema = 
org.apache.hudi.common.util.Option.empty[org.apache.avro.Schema]()
+      val reader = new HoodieHBaseAvroHFileReader(
+        hadoopStorageConfiguration, storagePath, emptySchema)
 
       val requiredRowSchema = requiredDataSchema.structTypeSchema
       // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] 
aren't serializable
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index bbe056739d5..5d56ef76e7c 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -22,6 +22,7 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
     <modules>
         <module>hudi-scanner</module>
+        <module>hadoop-hudi-scanner</module>
         <module>java-common</module>
         <module>java-udf</module>
         <module>jdbc-scanner</module>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index d1de1306d04..5ea2a2637d8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -433,6 +433,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             }
         }
 
+        // set file format type, and the type might fall back to native format 
in setScanParams
+        rangeDesc.setFormatType(getFileFormatType());
         setScanParams(rangeDesc, fileSplit);
         
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
         TScanRangeLocation location = new TScanRangeLocation();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index 0f839d238b2..97f86612a49 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -831,8 +832,10 @@ public class HiveMetaStoreClientHelper {
     public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
         String hudiBasePath = table.getRemoteTable().getSd().getLocation();
         Configuration conf = getConfiguration(table);
+        HadoopStorageConfiguration hadoopStorageConfiguration = new 
HadoopStorageConfiguration(conf);
         return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
-                () -> 
HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build());
+                () -> 
HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath)
+                        .build());
     }
 
     public static Configuration getConfiguration(HMSExternalTable table) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index d7803b1a516..c98d994a28a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -86,7 +86,7 @@ public class HudiUtils {
             case LONG:
                 if (logicalType instanceof LogicalTypes.TimestampMillis
                         || logicalType instanceof 
LogicalTypes.TimestampMicros) {
-                    return logicalType.getName();
+                    return "timestamp";
                 }
                 if (logicalType instanceof LogicalTypes.TimeMicros) {
                     return handleUnsupportedType(schema);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
index 7981a0b4f26..843dded2796 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.TimelineUtils;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StoragePath;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -105,7 +106,7 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
         List<HoodieInstant> commitsToReturn = 
commitsTimelineToReturn.getInstants();
 
         // todo: support configuration hoodie.datasource.read.incr.filters
-        Path basePath = metaClient.getBasePathV2();
+        StoragePath basePath = metaClient.getBasePathV2();
         Map<String, String> regularFileIdToFullPath = new HashMap<>();
         Map<String, String> metaBootstrapFileIdToFullPath = new HashMap<>();
         HoodieTimeline replacedTimeline = 
commitsTimelineToReturn.getCompletedReplaceTimeline();
@@ -113,8 +114,8 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
         for (HoodieInstant instant : replacedTimeline.getInstants()) {
             
HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
                     
HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach(
-                        (key, value) -> value.forEach(
-                            e -> replacedFile.put(e, 
FSUtils.getPartitionPath(basePath, key).toString())));
+                            (key, value) -> value.forEach(
+                                    e -> replacedFile.put(e, 
FSUtils.constructAbsolutePath(basePath, key).toString())));
         }
 
         fileToWriteStat = new HashMap<>();
@@ -123,7 +124,7 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
                     commitTimeline.getInstantDetails(commit).get(), 
HoodieCommitMetadata.class);
             metadata.getPartitionToWriteStats().forEach((partition, stats) -> {
                 for (HoodieWriteStat stat : stats) {
-                    fileToWriteStat.put(FSUtils.getPartitionPath(basePath, 
stat.getPath()).toString(), stat);
+                    
fileToWriteStat.put(FSUtils.constructAbsolutePath(basePath, 
stat.getPath()).toString(), stat);
                 }
             });
             if 
(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) {
@@ -158,7 +159,7 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
 
         }
 
-        fs = basePath.getFileSystem(configuration);
+        fs = new Path(basePath.toUri().getPath()).getFileSystem(configuration);
         fullTableScan = shouldFullTableScan();
         includeStartTime = !fullTableScan;
         if (fullTableScan || commitsToReturn.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
index 26ef6fdfef7..fecc026cf8d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
@@ -17,10 +17,6 @@
 
 package org.apache.doris.datasource.hudi.source;
 
-import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
 import org.apache.hudi.common.data.HoodieData;
@@ -39,7 +35,7 @@ import 
org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import java.util.Collections;
 import java.util.Iterator;
@@ -50,18 +46,20 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
- * This file is copied from 
org.apache.hudi.common.engine.HoodieLocalEngineContext.
+ * This file is copied from
+ * org.apache.hudi.common.engine.HudiLocalEngineContext.
  * Because we need set ugi in thread pool
- * A java based engine context, use this implementation on the query engine 
integrations if needed.
+ * A java based engine context, use this implementation on the query engine
+ * integrations if needed.
  */
 public final class HudiLocalEngineContext extends HoodieEngineContext {
 
-    public HudiLocalEngineContext(Configuration conf) {
+    public HudiLocalEngineContext(StorageConfiguration<?> conf) {
         this(conf, new LocalTaskContextSupplier());
     }
 
-    public HudiLocalEngineContext(Configuration conf, TaskContextSupplier 
taskContextSupplier) {
-        super(new SerializableConfiguration(conf), taskContextSupplier);
+    public HudiLocalEngineContext(StorageConfiguration<?> conf, 
TaskContextSupplier taskContextSupplier) {
+        super(conf, taskContextSupplier);
     }
 
     @Override
@@ -81,27 +79,18 @@ public final class HudiLocalEngineContext extends 
HoodieEngineContext {
 
     @Override
     public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, 
int parallelism) {
-        return data.stream().parallel().map(v1 -> {
-            try {
-                return 
HiveMetaStoreClientHelper.ugiDoAs(getHadoopConf().get(), () -> func.apply(v1));
-            } catch (Exception e) {
-                throw new HoodieException("Error occurs when executing map", 
e);
-            }
-        }).collect(Collectors.toList());
+        return 
data.stream().parallel().map(FunctionWrapper.throwingMapWrapper(func)).collect(Collectors.toList());
     }
 
     @Override
     public <I, K, V> List<V> mapToPairAndReduceByKey(
-            List<I> data,
-            SerializablePairFunction<I, K, V> mapToPairFunc,
-            SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+            List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, 
SerializableBiFunction<V, V, V> reduceFunc,
+            int parallelism) {
         return 
data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc))
-            .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
-            .map(list ->
-                    list.stream()
-                        .map(e -> e.getValue())
+                .collect(Collectors.groupingBy(p -> 
p.getKey())).values().stream()
+                .map(list -> list.stream().map(e -> e.getValue())
                         
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get())
-            .collect(Collectors.toList());
+                .collect(Collectors.toList());
     }
 
     @Override
@@ -109,29 +98,28 @@ public final class HudiLocalEngineContext extends 
HoodieEngineContext {
             Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
flatMapToPairFunc,
             SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
         return 
FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
-            .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
-            .map(entry -> new ImmutablePair<>(entry.getKey(), 
entry.getValue().stream().map(
-                
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
-            .filter(Objects::nonNull);
+                
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+                .map(entry -> new ImmutablePair<>(entry.getKey(), 
entry.getValue().stream().map(
+                        
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
+                .filter(Objects::nonNull);
     }
 
     @Override
     public <I, K, V> List<V> reduceByKey(
             List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, 
int parallelism) {
         return data.stream().parallel()
-            .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
-            .map(list ->
-                list.stream()
-                    .map(e -> e.getValue())
-                    
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
-            .filter(Objects::nonNull)
-            .collect(Collectors.toList());
+                .collect(Collectors.groupingBy(p -> 
p.getKey())).values().stream()
+                .map(list -> list.stream().map(e -> e.getValue())
+                        
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc))
+                        .orElse(null))
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
     }
 
     @Override
     public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, 
Stream<O>> func, int parallelism) {
-        return
-            
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)).collect(Collectors.toList());
+        return 
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func))
+                .collect(Collectors.toList());
     }
 
     @Override
@@ -142,8 +130,7 @@ public final class HudiLocalEngineContext extends 
HoodieEngineContext {
     @Override
     public <I, K, V> Map<K, V> mapToPair(List<I> data, 
SerializablePairFunction<I, K, V> func, Integer parallelism) {
         return 
data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect(
-            Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) 
-> newVal)
-        );
+                Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, 
newVal) -> newVal));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
index 738b2638588..0ab9fef951a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
@@ -21,7 +21,6 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 
@@ -49,14 +48,15 @@ public abstract class HudiPartitionProcessor {
                 .build();
 
         HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
-                new HudiLocalEngineContext(tableMetaClient.getHadoopConf()), 
metadataConfig,
+                new HudiLocalEngineContext(tableMetaClient.getStorageConf()), 
tableMetaClient.getStorage(),
+                metadataConfig,
                 tableMetaClient.getBasePathV2().toString(), true);
 
         return newTableMetadata.getAllPartitionPaths();
     }
 
     public List<String> getPartitionNamesBeforeOrEquals(HoodieTimeline 
timeline, String timestamp) {
-        return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
+        return new ArrayList<>(HoodieTableMetadataUtil.getWritePartitionPaths(
                 
timeline.findInstantsBeforeOrEquals(timestamp).getInstants().stream().map(instant
 -> {
                     try {
                         return TimelineUtils.getCommitMetadata(instant, 
timeline);
@@ -67,7 +67,7 @@ public abstract class HudiPartitionProcessor {
     }
 
     public List<String> getPartitionNamesInRange(HoodieTimeline timeline, 
String startTimestamp, String endTimestamp) {
-        return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
+        return new ArrayList<>(HoodieTableMetadataUtil.getWritePartitionPaths(
                 timeline.findInstantsInRange(startTimestamp, 
endTimestamp).getInstants().stream().map(instant -> {
                     try {
                         return TimelineUtils.getCommitMetadata(instant, 
timeline);
@@ -101,8 +101,10 @@ public abstract class HudiPartitionProcessor {
             } else {
                 // If the partition column size is not equal to the partition 
fragments size
                 // and the partition column size > 1, we do not know how to 
map the partition
-                // fragments to the partition columns and therefore return an 
empty tuple. We don't
-                // fail outright so that in some cases we can fallback to 
reading the table as non-partitioned
+                // fragments to the partition columns and therefore return an 
empty tuple. We
+                // don't
+                // fail outright so that in some cases we can fallback to 
reading the table as
+                // non-partitioned
                 // one
                 throw new RuntimeException("Failed to parse partition values 
of path: " + partitionPath);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index a8f2a362bfd..a73a2065d0f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.FileFormatUtils;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileSplit;
@@ -37,6 +38,7 @@ import org.apache.doris.datasource.hudi.HudiUtils;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -48,8 +50,6 @@ import org.apache.doris.thrift.TTableFormatFileDesc;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
@@ -62,6 +62,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -87,7 +89,7 @@ public class HudiScanNode extends HiveScanNode {
 
     private static final Logger LOG = LogManager.getLogger(HudiScanNode.class);
 
-    private boolean isCowOrRoTable;
+    private boolean isCowTable;
 
     private final AtomicLong noLogsSplitNum = new AtomicLong(0);
 
@@ -113,19 +115,23 @@ public class HudiScanNode extends HiveScanNode {
     private boolean incrementalRead = false;
     private TableScanParams scanParams;
     private IncrementalRelation incrementalRelation;
+    private SessionVariable sessionVariable;
 
     /**
      * External file scan node for Query Hudi table
-     * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
+     * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column
+     * priv
      * eg: s3 tvf
-     * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
+     * These scan nodes do not have corresponding catalog/database/table info, 
so no
+     * need to do priv check
      */
     public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
-            Optional<TableScanParams> scanParams, 
Optional<IncrementalRelation> incrementalRelation) {
+            Optional<TableScanParams> scanParams, 
Optional<IncrementalRelation> incrementalRelation,
+            SessionVariable sessionVariable) {
         super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv);
-        isCowOrRoTable = hmsTable.isHoodieCowTable();
+        isCowTable = hmsTable.isHoodieCowTable();
         if (LOG.isDebugEnabled()) {
-            if (isCowOrRoTable) {
+            if (isCowTable) {
                 LOG.debug("Hudi table {} can read as cow/read optimize table", 
hmsTable.getFullQualifiers());
             } else {
                 LOG.debug("Hudi table {} is a mor table, and will use JNI to 
read data in BE",
@@ -136,11 +142,12 @@ public class HudiScanNode extends HiveScanNode {
         this.scanParams = scanParams.orElse(null);
         this.incrementalRelation = incrementalRelation.orElse(null);
         this.incrementalRead = (this.scanParams != null && 
this.scanParams.incrementalRead());
+        this.sessionVariable = sessionVariable;
     }
 
     @Override
     public TFileFormatType getFileFormatType() throws UserException {
-        if (isCowOrRoTable) {
+        if (canUseNativeReader()) {
             return super.getFileFormatType();
         } else {
             // Use jni to read hudi table in BE
@@ -185,13 +192,13 @@ public class HudiScanNode extends HiveScanNode {
             throw new UserException("Not support function '" + 
scanParams.getParamType() + "' in hudi table");
         }
         if (incrementalRead) {
-            if (isCowOrRoTable) {
+            if (isCowTable) {
                 try {
                     Map<String, String> serd = 
hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
                     if ("true".equals(serd.get("hoodie.query.as.ro.table"))
                             && 
hmsTable.getRemoteTable().getTableName().endsWith("_ro")) {
                         // Incremental read RO table as RT table, I don't know 
why?
-                        isCowOrRoTable = false;
+                        isCowTable = false;
                         LOG.warn("Execute incremental read on RO table: {}", 
hmsTable.getFullQualifiers());
                     }
                 } catch (Exception e) {
@@ -236,7 +243,22 @@ public class HudiScanNode extends HiveScanNode {
     @Override
     protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
         if (split instanceof HudiSplit) {
-            setHudiParams(rangeDesc, (HudiSplit) split);
+            HudiSplit hudiSplit = (HudiSplit) split;
+            if (rangeDesc.getFormatType() == TFileFormatType.FORMAT_JNI
+                    && !sessionVariable.isForceJniScanner()
+                    && hudiSplit.getHudiDeltaLogs().isEmpty()) {
+                // no logs, is read optimize table, fallback to use native 
reader
+                String fileFormat = 
FileFormatUtils.getFileFormatBySuffix(hudiSplit.getDataFilePath())
+                        .orElse("Unknown");
+                if (fileFormat.equals("parquet")) {
+                    rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
+                } else if (fileFormat.equals("orc")) {
+                    rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC);
+                } else {
+                    throw new RuntimeException("Unsupported file format: " + 
fileFormat);
+                }
+            }
+            setHudiParams(rangeDesc, hudiSplit);
         }
     }
 
@@ -255,10 +277,15 @@ public class HudiScanNode extends HiveScanNode {
         fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes());
         // TODO(gaoxin): support complex types
         // fileDesc.setNestedFields(hudiSplit.getNestedFields());
+        fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner());
         tableFormatFileDesc.setHudiParams(fileDesc);
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
 
+    private boolean canUseNativeReader() {
+        return !sessionVariable.isForceJniScanner() && isCowTable;
+    }
+
     private List<HivePartition> getPrunedPartitions(
             HoodieTableMetaClient metaClient, Option<String> 
snapshotTimestamp) throws AnalysisException {
         List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
@@ -304,7 +331,8 @@ public class HudiScanNode extends HiveScanNode {
                 }
             }
         }
-        // unpartitioned table, create a dummy partition to save location and 
inputformat,
+        // unpartitioned table, create a dummy partition to save location and
+        // inputformat,
         // so that we can unify the interface.
         HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), 
hmsTable.getName(), true,
                 hmsTable.getRemoteTable().getSd().getInputFormat(),
@@ -315,7 +343,7 @@ public class HudiScanNode extends HiveScanNode {
     }
 
     private List<Split> getIncrementalSplits() {
-        if (isCowOrRoTable) {
+        if (canUseNativeReader()) {
             List<Split> splits = incrementalRelation.collectSplits();
             noLogsSplitNum.addAndGet(splits.size());
             return splits;
@@ -336,15 +364,15 @@ public class HudiScanNode extends HiveScanNode {
             globPath = hudiClient.getBasePathV2().toString() + "/*";
         } else {
             partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
-                    new Path(partition.getPath()));
+                    new StoragePath(partition.getPath()));
             globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
         }
-        List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(
-                hudiClient.getRawFs(), new Path(globPath));
+        List<StoragePathInfo> statuses = 
FSUtils.getGlobStatusExcludingMetaFolder(
+                hudiClient.getRawHoodieStorage(), new StoragePath(globPath));
         HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
-                timeline, statuses.toArray(new FileStatus[0]));
+                timeline, statuses);
 
-        if (isCowOrRoTable) {
+        if (canUseNativeReader()) {
             fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant).forEach(baseFile -> {
                 noLogsSplitNum.incrementAndGet();
                 String filePath = baseFile.getPath();
@@ -473,9 +501,9 @@ public class HudiScanNode extends HiveScanNode {
         fileSlice.getPartitionPath();
 
         List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
-                .map(Path::toString)
+                .map(StoragePath::toString)
                 .collect(Collectors.toList());
-        if (logs.isEmpty()) {
+        if (logs.isEmpty() && !sessionVariable.isForceJniScanner()) {
             noLogsSplitNum.incrementAndGet();
         }
 
@@ -492,6 +520,7 @@ public class HudiScanNode extends HiveScanNode {
         split.setHudiColumnNames(columnNames);
         split.setHudiColumnTypes(columnTypes);
         split.setInstantTime(queryInstant);
+        split.setHudiJniScanner(sessionVariable.getHudiJniScanner());
         return split;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
index c72f7621fea..2270d201793 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
@@ -40,6 +40,5 @@ public class HudiSplit extends FileSplit {
     private List<String> hudiColumnNames;
     private List<String> hudiColumnTypes;
     private List<String> nestedFields;
+    private String hudiJniScanner;
 }
-
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
index c06fcc2a578..7df01359922 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
@@ -20,9 +20,7 @@ package org.apache.doris.datasource.hudi.source;
 import org.apache.doris.spi.Split;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.GlobPattern;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -34,6 +32,8 @@ import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -54,7 +54,7 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
     private final boolean endInstantArchived;
     private final List<HoodieInstant> includedCommits;
     private final List<HoodieCommitMetadata> commitsMetadata;
-    private final FileStatus[] affectedFilesInCommits;
+    private final List<StoragePathInfo> affectedFilesInCommits;
     private final boolean fullTableScan;
     private final String globPattern;
     private final boolean includeStartTime;
@@ -96,7 +96,7 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         includedCommits = getIncludedCommits();
         commitsMetadata = getCommitsMetadata();
         affectedFilesInCommits = 
HoodieInputFormatUtils.listAffectedFilesForCommits(configuration,
-                new Path(metaClient.getBasePath()), commitsMetadata);
+                metaClient.getBasePathV2(), commitsMetadata);
         fullTableScan = shouldFullTableScan();
         if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME 
&& fullTableScan) {
             throw new HoodieException("Cannot use stateTransitionTime while 
enables full table scan");
@@ -152,8 +152,8 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         if (should) {
             return true;
         }
-        for (FileStatus fileStatus : affectedFilesInCommits) {
-            if (!metaClient.getFs().exists(fileStatus.getPath())) {
+        for (StoragePathInfo fileStatus : affectedFilesInCommits) {
+            if 
(!metaClient.getRawHoodieStorage().exists(fileStatus.getPath())) {
                 return true;
             }
         }
@@ -199,7 +199,7 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         String latestCommit = includedCommits.get(includedCommits.size() - 
1).getTimestamp();
         HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, scanTimeline,
                 affectedFilesInCommits);
-        Stream<FileSlice> fileSlices = 
HoodieInputFormatUtils.getWritePartitionPaths(commitsMetadata)
+        Stream<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getWritePartitionPaths(commitsMetadata)
                 .stream().flatMap(relativePartitionPath ->
                         
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit));
         if ("".equals(globPattern)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 59f51c8425c..bf917804fb7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -105,9 +105,9 @@ public class PaimonScanNode extends FileQueryScanNode {
     private String serializedTable;
 
     public PaimonScanNode(PlanNodeId id,
-                          TupleDescriptor desc,
-                          boolean needCheckColumnPriv,
-                          SessionVariable sessionVariable) {
+            TupleDescriptor desc,
+            boolean needCheckColumnPriv,
+            SessionVariable sessionVariable) {
         super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, 
needCheckColumnPriv);
         this.sessionVariable = sessionVariable;
     }
@@ -127,8 +127,7 @@ public class PaimonScanNode extends FileQueryScanNode {
         predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
     }
 
-    private static final Base64.Encoder BASE64_ENCODER =
-            java.util.Base64.getUrlEncoder().withoutPadding();
+    private static final Base64.Encoder BASE64_ENCODER = 
java.util.Base64.getUrlEncoder().withoutPadding();
 
     public static <T> String encodeObjectToString(T t) {
         try {
@@ -156,11 +155,24 @@ public class PaimonScanNode extends FileQueryScanNode {
         
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
         TPaimonFileDesc fileDesc = new TPaimonFileDesc();
         org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
+
+        String fileFormat = getFileFormat(paimonSplit.getPathString());
         if (split != null) {
             // use jni reader
+            rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
             fileDesc.setPaimonSplit(encodeObjectToString(split));
+        } else {
+            // use native reader
+            if (fileFormat.equals("orc")) {
+                rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC);
+            } else if (fileFormat.equals("parquet")) {
+                rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
+            } else {
+                throw new RuntimeException("Unsupported file format: " + 
fileFormat);
+            }
         }
-        fileDesc.setFileFormat(getFileFormat(paimonSplit.getPathString()));
+
+        fileDesc.setFileFormat(fileFormat);
         fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
         
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> 
slot.getColumn().getName())
                 .collect(Collectors.joining(",")));
@@ -172,7 +184,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         fileDesc.setTblId(source.getTargetTable().getId());
         fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
         fileDesc.setPaimonTable(encodeObjectToString(source.getPaimonTable()));
-        // The hadoop conf should be same with 
PaimonExternalCatalog.createCatalog()#getConfiguration()
+        // The hadoop conf should be same with
+        // PaimonExternalCatalog.createCatalog()#getConfiguration()
         
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
         Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
         if (optDeletionFile.isPresent()) {
@@ -190,8 +203,8 @@ public class PaimonScanNode extends FileQueryScanNode {
     @Override
     public List<Split> getSplits() throws UserException {
         boolean forceJniScanner = sessionVariable.isForceJniScanner();
-        SessionVariable.IgnoreSplitType ignoreSplitType =
-                
SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType());
+        SessionVariable.IgnoreSplitType ignoreSplitType = 
SessionVariable.IgnoreSplitType
+                .valueOf(sessionVariable.getIgnoreSplitType());
         List<Split> splits = new ArrayList<>();
         int[] projected = desc.getSlots().stream().mapToInt(
                 slot -> 
(source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
@@ -288,7 +301,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
         this.selectedPartitionNum = selectedPartitionValues.size();
         // TODO: get total partition number
-        // We should set fileSplitSize at the end because fileSplitSize may be 
modified in splitFile.
+        // We should set fileSplitSize at the end because fileSplitSize may be 
modified
+        // in splitFile.
         splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
         return splits;
     }
@@ -318,8 +332,9 @@ public class PaimonScanNode extends FileQueryScanNode {
 
     @Override
     public List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException {
-        //                return new 
ArrayList<>(source.getPaimonTable().partitionKeys());
-        //Paymon is not aware of partitions and bypasses some existing logic 
by returning an empty list
+        // return new ArrayList<>(source.getPaimonTable().partitionKeys());
+        // Paymon is not aware of partitions and bypasses some existing logic 
by
+        // returning an empty list
         return new ArrayList<>();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index a53337c7d86..61d07ef6bbc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -652,7 +652,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         + " for Hudi table");
         PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
         ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false,
-                hudiScan.getScanParams(), hudiScan.getIncrementalRelation());
+                hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), 
ConnectContext.get().getSessionVariable());
         if (fileScan.getTableSnapshot().isPresent()) {
             ((FileQueryScanNode) 
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 4091640066c..ae5d562bd5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1968,7 +1968,7 @@ public class SingleNodePlanner {
                                     + "please set enable_nereids_planner = 
true to enable new optimizer");
                         }
                         scanNode = new HudiScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true,
-                                Optional.empty(), Optional.empty());
+                                Optional.empty(), Optional.empty(), 
ConnectContext.get().getSessionVariable());
                         break;
                     case ICEBERG:
                         scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3ac50c9d7b5..35828c90f8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -633,6 +633,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
 
+    public static final String HUDI_JNI_SCANNER = "hudi_jni_scanner";
+
     public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = 
"enable_count_push_down_for_external_table";
 
     public static final String SHOW_ALL_FE_CONNECTION = 
"show_all_fe_connection";
@@ -2072,6 +2074,10 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"强制使用jni方式读取外表", "Force the use of jni mode to read 
external table"})
     private boolean forceJniScanner = false;
 
+    @VariableMgr.VarAttr(name = HUDI_JNI_SCANNER, description = { "使用那种hudi 
jni scanner, 'hadoop' 或 'spark'",
+            "Which hudi jni scanner to use, 'hadoop' or 'spark'" })
+    private String hudiJniScanner = "hadoop";
+
     @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
             description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown 
optimization for external table"})
     private boolean enableCountPushDownForExternalTable = true;
@@ -4482,6 +4488,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return forceJniScanner;
     }
 
+    public String getHudiJniScanner() {
+        return hudiJniScanner;
+    }
+
     public String getIgnoreSplitType() {
         return ignoreSplitType;
     }
@@ -4502,6 +4512,10 @@ public class SessionVariable implements Serializable, 
Writable {
         forceJniScanner = force;
     }
 
+    public void setHudiJniScanner(String hudiJniScanner) {
+        this.hudiJniScanner = hudiJniScanner;
+    }
+
     public boolean isEnableCountPushDownForExternalTable() {
         return enableCountPushDownForExternalTable;
     }
diff --git a/fe/pom.xml b/fe/pom.xml
index d78cfd50b81..0d2e3f70aa6 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -317,7 +317,7 @@ under the License.
         <avro.version>1.11.4</avro.version>
         <arrow.version>17.0.0</arrow.version>
         <!-- hudi -->
-        <hudi.version>0.14.1</hudi.version>
+        <hudi.version>0.15.0</hudi.version>
         <presto.hadoop.version>2.7.4-11</presto.hadoop.version>
         <presto.hive.version>3.0.0-8</presto.hive.version>
         <!-- lakesoul -->
@@ -371,7 +371,7 @@ under the License.
         <trino.version>435</trino.version>
         <jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
         <asm.version>9.4</asm.version>
-        <airlift.version>202</airlift.version>
+        <airlift.concurrent.version>202</airlift.concurrent.version>
         <azure.sdk.version>1.2.27</azure.sdk.version>
         <azure.sdk.batch.version>12.22.0</azure.sdk.batch.version>
         <semver4j.version>5.3.0</semver4j.version>
@@ -1649,7 +1649,7 @@ under the License.
             <dependency>
                 <groupId>io.airlift</groupId>
                 <artifactId>concurrent</artifactId>
-                <version>${airlift.version}</version>
+                <version>${airlift.concurrent.version}</version>
             </dependency>
             <dependency>
                 <groupId>com.azure</groupId>
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 0bbd364fda1..3a0a995ca45 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -353,7 +353,6 @@ struct TMaxComputeFileDesc {
     1: optional string partition_spec // deprecated 
     2: optional string session_id 
     3: optional string table_batch_read_session
-
 }
 
 struct THudiFileDesc {
@@ -367,6 +366,7 @@ struct THudiFileDesc {
     8: optional list<string> column_names;
     9: optional list<string> column_types;
     10: optional list<string> nested_fields;
+    11: optional string hudi_jni_scanner;
 }
 
 struct TLakeSoulFileDesc {
@@ -406,6 +406,7 @@ enum TTextSerdeType {
 struct TFileScanRangeParams {
     // deprecated, move to TFileScanRange
     1: optional Types.TFileType file_type;
+    // deprecated, move to TFileScanRange
     2: optional TFileFormatType format_type;
     // deprecated, move to TFileScanRange
     3: optional TFileCompressType compress_type;
@@ -480,6 +481,7 @@ struct TFileRangeDesc {
     // for hive table, different files may have different fs,
     // so fs_name should be with TFileRangeDesc
     12: optional string fs_name
+    13: optional TFileFormatType format_type;
 }
 
 struct TSplitSource {
diff --git 
a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
index b1bdad85013..50644f34961 100644
--- a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
@@ -347,3 +347,177 @@
 -- !incremental_9_10 --
 1000
 
+-- !incremental_1_end --
+9000
+
+-- !incremental_earliest_1 --
+1000
+
+-- !incremental_2_end --
+8000
+
+-- !incremental_earliest_2 --
+2000
+
+-- !incremental_1_2 --
+1000
+
+-- !incremental_3_end --
+7000
+
+-- !incremental_earliest_3 --
+3000
+
+-- !incremental_2_3 --
+1000
+
+-- !incremental_4_end --
+6000
+
+-- !incremental_earliest_4 --
+4000
+
+-- !incremental_3_4 --
+1000
+
+-- !incremental_5_end --
+5000
+
+-- !incremental_earliest_5 --
+5000
+
+-- !incremental_4_5 --
+1000
+
+-- !incremental_6_end --
+4000
+
+-- !incremental_earliest_6 --
+6000
+
+-- !incremental_5_6 --
+1000
+
+-- !incremental_7_end --
+3000
+
+-- !incremental_earliest_7 --
+7000
+
+-- !incremental_6_7 --
+1000
+
+-- !incremental_8_end --
+2000
+
+-- !incremental_earliest_8 --
+8000
+
+-- !incremental_7_8 --
+1000
+
+-- !incremental_9_end --
+1000
+
+-- !incremental_earliest_9 --
+9000
+
+-- !incremental_8_9 --
+1000
+
+-- !incremental_10_end --
+0
+
+-- !incremental_earliest_10 --
+10000
+
+-- !incremental_9_10 --
+1000
+
+-- !incremental_1_end --
+9000
+
+-- !incremental_earliest_1 --
+1000
+
+-- !incremental_2_end --
+8000
+
+-- !incremental_earliest_2 --
+2000
+
+-- !incremental_1_2 --
+1000
+
+-- !incremental_3_end --
+7000
+
+-- !incremental_earliest_3 --
+3000
+
+-- !incremental_2_3 --
+1000
+
+-- !incremental_4_end --
+6000
+
+-- !incremental_earliest_4 --
+4000
+
+-- !incremental_3_4 --
+1000
+
+-- !incremental_5_end --
+5000
+
+-- !incremental_earliest_5 --
+5000
+
+-- !incremental_4_5 --
+1000
+
+-- !incremental_6_end --
+4000
+
+-- !incremental_earliest_6 --
+6000
+
+-- !incremental_5_6 --
+1000
+
+-- !incremental_7_end --
+3000
+
+-- !incremental_earliest_7 --
+7000
+
+-- !incremental_6_7 --
+1000
+
+-- !incremental_8_end --
+2000
+
+-- !incremental_earliest_8 --
+8000
+
+-- !incremental_7_8 --
+1000
+
+-- !incremental_9_end --
+1000
+
+-- !incremental_earliest_9 --
+9000
+
+-- !incremental_8_9 --
+1000
+
+-- !incremental_10_end --
+0
+
+-- !incremental_earliest_10 --
+10000
+
+-- !incremental_9_10 --
+1000
+
diff --git 
a/regression-test/data/external_table_p2/hudi/test_hudi_orc_tables.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_orc_tables.out
new file mode 100644
index 00000000000..9e28074dc91
--- /dev/null
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_orc_tables.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !cow --
+20241204190011744      20241204190011744_0_6   20241204190011744_0_0           
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc    1       
A
+20241204190011744      20241204190011744_0_7   20241204190011744_2_0           
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc    3       
C
+20241204190011744      20241204190011744_0_8   20241204190011744_4_0           
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc    5       
E
+20241204190011744      20241204190011744_0_9   20241204190011744_1_0           
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc    2       
B
+20241204190011744      20241204190011744_0_10  20241204190011744_3_0           
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc    4       
D
+
+-- !mor --
+20241204190002046      20241204190002046_0_11  20241204190002046_0_0           
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc    1       
A
+20241204190002046      20241204190002046_0_12  20241204190002046_2_0           
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc    3       
C
+20241204190002046      20241204190002046_0_13  20241204190002046_4_0           
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc    5       
E
+20241204190002046      20241204190002046_0_14  20241204190002046_1_0           
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc    2       
B
+20241204190002046      20241204190002046_0_15  20241204190002046_3_0           
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc    4       
D
+
diff --git 
a/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out
index 12dd0cf086d..da7273d4c14 100644
--- a/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out
@@ -31,3 +31,35 @@
 20241118012149007      20241118012149007_0_4   5               
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet      
5       Eva     {"age":31.5, "address":"Chengdu"}
 20241118012149007      20241118012149007_0_5   6               
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet      
6       Frank   {"age":29.2, "address":"Wuhan"}
 
+-- !adding_simple_columns_table --
+20241118012126237      20241118012126237_0_1   1               
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet       
1       Alice   \N
+20241118012126237      20241118012126237_0_0   2               
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet       
2       Bob     \N
+20241118012126237      20241118012126237_0_2   3               
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet       
3       Cathy   \N
+20241118012132306      20241118012132306_0_3   4               
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet       
4       David   25
+20241118012132306      20241118012132306_0_4   5               
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet       
5       Eva     30
+20241118012132306      20241118012132306_0_5   6               
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet       
6       Frank   28
+
+-- !altering_simple_columns_table --
+20241118012136512      20241118012136512_0_0   1               
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet       
1       Alice   25.0
+20241118012136512      20241118012136512_0_2   2               
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet       
2       Bob     30.0
+20241118012136512      20241118012136512_0_1   3               
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet       
3       Cathy   28.0
+20241118012138287      20241118012138287_0_3   4               
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet       
4       David   26.0
+20241118012138287      20241118012138287_0_4   5               
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet       
5       Eva     31.5
+20241118012138287      20241118012138287_0_5   6               
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet       
6       Frank   29.2
+
+-- !adding_complex_columns_table --
+20241118012144831      20241118012144831_0_1   1               
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet      
1       Alice   {"age":25, "address":"Guangzhou", "email":null}
+20241118012144831      20241118012144831_0_0   2               
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet      
2       Bob     {"age":30, "address":"Shanghai", "email":null}
+20241118012144831      20241118012144831_0_2   3               
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet      
3       Cathy   {"age":28, "address":"Beijing", "email":null}
+20241118012146150      20241118012146150_0_3   4               
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet      
4       David   {"age":25, "address":"Shenzhen", "email":"da...@example.com"}
+20241118012146150      20241118012146150_0_4   5               
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet      
5       Eva     {"age":30, "address":"Chengdu", "email":"e...@example.com"}
+20241118012146150      20241118012146150_0_5   6               
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet      
6       Frank   {"age":28, "address":"Wuhan", "email":"fr...@example.com"}
+
+-- !altering_complex_columns_table --
+20241118012147879      20241118012147879_0_0   1               
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet      
1       Alice   {"age":25, "address":"Guangzhou"}
+20241118012147879      20241118012147879_0_2   2               
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet      
2       Bob     {"age":30, "address":"Shanghai"}
+20241118012147879      20241118012147879_0_1   3               
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet      
3       Cathy   {"age":28, "address":"Beijing"}
+20241118012149007      20241118012149007_0_3   4               
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet      
4       David   {"age":26, "address":"Shenzhen"}
+20241118012149007      20241118012149007_0_4   5               
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet      
5       Eva     {"age":31.5, "address":"Chengdu"}
+20241118012149007      20241118012149007_0_5   6               
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet      
6       Frank   {"age":29.2, "address":"Wuhan"}
+
diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out
index efad67ffbfa..1e151c2a86f 100644
Binary files 
a/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out and 
b/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out differ
diff --git 
a/regression-test/data/external_table_p2/hudi/test_hudi_timestamp.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_timestamp.out
index dc47ff86d90..9bdb0f7cb72 100644
--- a/regression-test/data/external_table_p2/hudi/test_hudi_timestamp.out
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_timestamp.out
@@ -1,6 +1,31 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
--- !timestamp --
+-- !timestamp1 --
 20241115015956800      20241115015956800_0_2   1               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1       
Alice   2024-10-25T08:00
-20241115015956800      20241115015956800_0_0   2               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2       
Bob     2024-10-25T09:30:00
-20241115015956800      20241115015956800_0_1   3               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3       
Charlie 2024-10-25T11:00:00
+20241115015956800      20241115015956800_0_0   2               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2       
Bob     2024-10-25T09:30
+20241115015956800      20241115015956800_0_1   3               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3       
Charlie 2024-10-25T11:00
+
+-- !timestamp2 --
+20241115015956800      20241115015956800_0_2   1               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1       
Alice   2024-10-25T23:00
+20241115015956800      20241115015956800_0_0   2               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2       
Bob     2024-10-26T00:30
+20241115015956800      20241115015956800_0_1   3               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3       
Charlie 2024-10-26T02:00
+
+-- !timestamp3 --
+20241115015956800      20241115015956800_0_2   1               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1       
Alice   2024-10-25T15:00
+20241115015956800      20241115015956800_0_0   2               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2       
Bob     2024-10-25T16:30
+20241115015956800      20241115015956800_0_1   3               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3       
Charlie 2024-10-25T18:00
+
+-- !timestamp1 --
+20241115015956800      20241115015956800_0_2   1               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1       
Alice   2024-10-25T08:00
+20241115015956800      20241115015956800_0_0   2               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2       
Bob     2024-10-25T09:30
+20241115015956800      20241115015956800_0_1   3               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3       
Charlie 2024-10-25T11:00
+
+-- !timestamp2 --
+20241115015956800      20241115015956800_0_2   1               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1       
Alice   2024-10-25T23:00
+20241115015956800      20241115015956800_0_0   2               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2       
Bob     2024-10-26T00:30
+20241115015956800      20241115015956800_0_1   3               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3       
Charlie 2024-10-26T02:00
+
+-- !timestamp3 --
+20241115015956800      20241115015956800_0_2   1               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1       
Alice   2024-10-25T15:00
+20241115015956800      20241115015956800_0_0   2               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2       
Bob     2024-10-25T16:30
+20241115015956800      20241115015956800_0_1   3               
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3       
Charlie 2024-10-25T18:00
 
diff --git 
a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
index a9b5d23595a..00d15805baf 100644
--- a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
@@ -119,3 +119,123 @@
 -- !timetravel10 --
 10000
 
+-- !timetravel1 --
+1000
+
+-- !timetravel2 --
+2000
+
+-- !timetravel3 --
+3000
+
+-- !timetravel4 --
+4000
+
+-- !timetravel5 --
+5000
+
+-- !timetravel6 --
+6000
+
+-- !timetravel7 --
+7000
+
+-- !timetravel8 --
+8000
+
+-- !timetravel9 --
+9000
+
+-- !timetravel10 --
+10000
+
+-- !timetravel1 --
+1000
+
+-- !timetravel2 --
+2000
+
+-- !timetravel3 --
+3000
+
+-- !timetravel4 --
+4000
+
+-- !timetravel5 --
+5000
+
+-- !timetravel6 --
+6000
+
+-- !timetravel7 --
+7000
+
+-- !timetravel8 --
+8000
+
+-- !timetravel9 --
+9000
+
+-- !timetravel10 --
+10000
+
+-- !timetravel1 --
+1000
+
+-- !timetravel2 --
+2000
+
+-- !timetravel3 --
+3000
+
+-- !timetravel4 --
+4000
+
+-- !timetravel5 --
+5000
+
+-- !timetravel6 --
+6000
+
+-- !timetravel7 --
+7000
+
+-- !timetravel8 --
+8000
+
+-- !timetravel9 --
+9000
+
+-- !timetravel10 --
+10000
+
+-- !timetravel1 --
+1000
+
+-- !timetravel2 --
+2000
+
+-- !timetravel3 --
+3000
+
+-- !timetravel4 --
+4000
+
+-- !timetravel5 --
+5000
+
+-- !timetravel6 --
+6000
+
+-- !timetravel7 --
+7000
+
+-- !timetravel8 --
+8000
+
+-- !timetravel9 --
+9000
+
+-- !timetravel10 --
+10000
+
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
index f2082ef89c7..149eecf5817 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
@@ -36,4 +36,4 @@ suite("test_hudi_catalog", 
"p2,external,hudi,external_remote,external_remote_hud
     def tables = sql """ show tables; """
     assertTrue(tables.size() > 0)
     sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
index 8cc1d2a852b..885903646cc 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
@@ -60,7 +60,6 @@ suite("test_hudi_incremental", 
"p2,external,hudi,external_remote,external_remote
         "20241114152009764",
         "20241114152011901",
     ]
-    test_hudi_incremental_querys("user_activity_log_cow_non_partition", 
timestamps_cow_non_partition)
 
     // spark-sql "select distinct _hoodie_commit_time from 
user_activity_log_cow_partition order by _hoodie_commit_time;"
     def timestamps_cow_partition = [
@@ -75,7 +74,6 @@ suite("test_hudi_incremental", 
"p2,external,hudi,external_remote,external_remote
         "20241114152147114",
         "20241114152156417",
     ]
-    test_hudi_incremental_querys("user_activity_log_cow_partition", 
timestamps_cow_partition)
 
     // spark-sql "select distinct _hoodie_commit_time from 
user_activity_log_mor_non_partition order by _hoodie_commit_time;"
     def timestamps_mor_non_partition = [
@@ -90,7 +88,6 @@ suite("test_hudi_incremental", 
"p2,external,hudi,external_remote,external_remote
         "20241114152028770",
         "20241114152030746",
     ]
-    test_hudi_incremental_querys("user_activity_log_mor_non_partition", 
timestamps_mor_non_partition)
 
     // spark-sql "select distinct _hoodie_commit_time from 
user_activity_log_mor_partition order by _hoodie_commit_time;"
     def timestamps_mor_partition = [
@@ -105,7 +102,18 @@ suite("test_hudi_incremental", 
"p2,external,hudi,external_remote,external_remote
         "20241114152323587",
         "20241114152334111",
     ]
+
+    test_hudi_incremental_querys("user_activity_log_cow_non_partition", 
timestamps_cow_non_partition)
+    test_hudi_incremental_querys("user_activity_log_cow_partition", 
timestamps_cow_partition)
+    test_hudi_incremental_querys("user_activity_log_mor_non_partition", 
timestamps_mor_non_partition)
+    test_hudi_incremental_querys("user_activity_log_mor_partition", 
timestamps_mor_partition)
+    sql """set force_jni_scanner=true;"""
+    // don't support incremental query for cow table by jni reader
+    // test_hudi_incremental_querys("user_activity_log_cow_non_partition", 
timestamps_cow_non_partition)
+    // test_hudi_incremental_querys("user_activity_log_cow_partition", 
timestamps_cow_partition)
+    test_hudi_incremental_querys("user_activity_log_mor_non_partition", 
timestamps_mor_non_partition)
     test_hudi_incremental_querys("user_activity_log_mor_partition", 
timestamps_mor_partition)
+    // sql """set force_jni_scanner=false;"""
 
     sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_orc_tables.groovy
similarity index 84%
copy from regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
copy to 
regression-test/suites/external_table_p2/hudi/test_hudi_orc_tables.groovy
index f2082ef89c7..43638a23881 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_orc_tables.groovy
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_hudi_catalog", 
"p2,external,hudi,external_remote,external_remote_hudi") {
+suite("test_hudi_orc_tables", 
"p2,external,hudi,external_remote,external_remote_hudi") {
     String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
     if (enabled == null || !enabled.equalsIgnoreCase("true")) {
         logger.info("disable hudi test")
     }
 
-    String catalog_name = "test_hudi_catalog"
+    String catalog_name = "test_hudi_orc_tables"
     String props = context.config.otherConfigs.get("hudiEmrCatalog")
     sql """drop catalog if exists ${catalog_name};"""
     sql """
@@ -33,7 +33,9 @@ suite("test_hudi_catalog", 
"p2,external,hudi,external_remote,external_remote_hud
     sql """ switch ${catalog_name};"""
     sql """ use regression_hudi;""" 
     sql """ set enable_fallback_to_original_planner=false """
-    def tables = sql """ show tables; """
-    assertTrue(tables.size() > 0)
+    
+    qt_cow """ select * from  orc_hudi_table_cow; """
+    qt_mor """ select * from  orc_hudi_table_mor; """
+
     sql """drop catalog if exists ${catalog_name};"""
 }
\ No newline at end of file
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
 
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
index b247aaf4924..0da88447cde 100644
--- 
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
+++ 
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
@@ -33,7 +33,18 @@ suite("test_hudi_schema_evolution", 
"p2,external,hudi,external_remote,external_r
     sql """ switch ${catalog_name};"""
     sql """ use regression_hudi;""" 
     sql """ set enable_fallback_to_original_planner=false """
+    
+    qt_adding_simple_columns_table """ select * from 
adding_simple_columns_table order by id """
+    qt_altering_simple_columns_table """ select * from 
altering_simple_columns_table order by id """
+    // qt_deleting_simple_columns_table """ select * from 
deleting_simple_columns_table order by id """
+    // qt_renaming_simple_columns_table """ select * from 
renaming_simple_columns_table order by id """
 
+    qt_adding_complex_columns_table """ select * from 
adding_complex_columns_table order by id """
+    qt_altering_complex_columns_table """ select * from 
altering_complex_columns_table order by id """
+    // qt_deleting_complex_columns_table """ select * from 
deleting_complex_columns_table order by id """
+    // qt_renaming_complex_columns_table """ select * from 
renaming_complex_columns_table order by id """
+    
+    sql """set force_jni_scanner = true;"""
     qt_adding_simple_columns_table """ select * from 
adding_simple_columns_table order by id """
     qt_altering_simple_columns_table """ select * from 
altering_simple_columns_table order by id """
     // qt_deleting_simple_columns_table """ select * from 
deleting_simple_columns_table order by id """
@@ -43,6 +54,7 @@ suite("test_hudi_schema_evolution", 
"p2,external,hudi,external_remote,external_r
     qt_altering_complex_columns_table """ select * from 
altering_complex_columns_table order by id """
     // qt_deleting_complex_columns_table """ select * from 
deleting_complex_columns_table order by id """
     // qt_renaming_complex_columns_table """ select * from 
renaming_complex_columns_table order by id """
+    sql """set force_jni_scanner = false;"""
 
     sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
index 53c09e6d5a9..89d89709b3c 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
@@ -64,7 +64,7 @@ suite("test_hudi_snapshot", 
"p2,external,hudi,external_remote,external_remote_hu
         qt_q09 """SELECT * FROM ${table_name} WHERE 
struct_element(struct_element(address, 'coordinates'), 'latitude') BETWEEN 0 
AND 100 AND struct_element(struct_element(address, 'coordinates'), 'longitude') 
BETWEEN 0 AND 100 ORDER BY event_time LIMIT 5;"""
 
         // Query records with ratings above a specific value and limit output
-        qt_q10 """SELECT * FROM ${table_name} WHERE rating > 4.5 ORDER BY 
rating DESC LIMIT 5;"""
+        qt_q10 """SELECT * FROM ${table_name} WHERE rating > 4.5 ORDER BY 
event_time DESC LIMIT 5;"""
 
         // Query all users' signup dates and limit output
         qt_q11 """SELECT user_id, signup_date FROM ${table_name} ORDER BY 
signup_date DESC LIMIT 10;"""
@@ -79,13 +79,20 @@ suite("test_hudi_snapshot", 
"p2,external,hudi,external_remote,external_remote_hu
         qt_q14 """SELECT * FROM ${table_name} WHERE signup_date = '2024-01-15' 
ORDER BY user_id LIMIT 5;"""
 
         // Query the total count of purchases for each user and limit output
-        qt_q15 """SELECT user_id, array_size(purchases) AS purchase_count FROM 
${table_name} ORDER BY purchase_count DESC LIMIT 5;"""
+        qt_q15 """SELECT user_id, array_size(purchases) AS purchase_count FROM 
${table_name} ORDER BY user_id LIMIT 5;"""
     }
 
+    test_hudi_snapshot_querys("user_activity_log_mor_non_partition")
+    test_hudi_snapshot_querys("user_activity_log_mor_partition")
     test_hudi_snapshot_querys("user_activity_log_cow_non_partition")
     test_hudi_snapshot_querys("user_activity_log_cow_partition")
+
+    sql """set force_jni_scanner=true;"""
     test_hudi_snapshot_querys("user_activity_log_mor_non_partition")
     test_hudi_snapshot_querys("user_activity_log_mor_partition")
+    test_hudi_snapshot_querys("user_activity_log_cow_non_partition")
+    test_hudi_snapshot_querys("user_activity_log_cow_partition")
+    sql """set force_jni_scanner=false;"""
 
     sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy
index c1ba630e4a7..3d7bd40b2d5 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy
@@ -34,8 +34,22 @@ suite("test_hudi_timestamp", 
"p2,external,hudi,external_remote,external_remote_h
     sql """ use regression_hudi;""" 
     sql """ set enable_fallback_to_original_planner=false """
 
-    // TODO: fix hudi timezone issue and enable this
-    // qt_timestamp """ select * from hudi_table_with_timestamp order by id; 
"""
+    def test_timestamp_different_timezones = {
+        sql """set time_zone = 'America/Los_Angeles';"""
+        qt_timestamp1 """ select * from hudi_table_with_timestamp order by id; 
"""
+        sql """set time_zone = 'Asia/Shanghai';"""
+        qt_timestamp2 """ select * from hudi_table_with_timestamp order by id; 
"""
+        sql """set time_zone = 'UTC';"""
+        qt_timestamp3 """ select * from hudi_table_with_timestamp order by id; 
"""
+    }
+
+    // test native reader
+    test_timestamp_different_timezones()
+    sql """ set force_jni_scanner = true; """
+    // test jni reader
+    test_timestamp_different_timezones()
+    sql """ set force_jni_scanner = false; """
+
 
     sql """drop catalog if exists ${catalog_name};"""
 }
@@ -59,4 +73,4 @@ suite("test_hudi_timestamp", 
"p2,external,hudi,external_remote,external_remote_h
 // INSERT OVERWRITE hudi_table_with_timestamp VALUES
 // ('1', 'Alice', timestamp('2024-10-25 08:00:00')),
 // ('2', 'Bob', timestamp('2024-10-25 09:30:00')),
-// ('3', 'Charlie', timestamp('2024-10-25 11:00:00'));
\ No newline at end of file
+// ('3', 'Charlie', timestamp('2024-10-25 11:00:00'));
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
index 4d458dc4381..cceeaa41220 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
@@ -54,7 +54,6 @@ suite("test_hudi_timetravel", 
"p2,external,hudi,external_remote,external_remote_
         "20241114152009764",
         "20241114152011901",
     ]
-    test_hudi_timetravel_querys("user_activity_log_cow_non_partition", 
timestamps_cow_non_partition)
 
     // spark-sql "select distinct _hoodie_commit_time from 
user_activity_log_cow_partition order by _hoodie_commit_time;"
     def timestamps_cow_partition = [
@@ -69,7 +68,6 @@ suite("test_hudi_timetravel", 
"p2,external,hudi,external_remote,external_remote_
         "20241114152147114",
         "20241114152156417",
     ]
-    test_hudi_timetravel_querys("user_activity_log_cow_partition", 
timestamps_cow_partition)
 
     // spark-sql "select distinct _hoodie_commit_time from 
user_activity_log_mor_non_partition order by _hoodie_commit_time;"
     def timestamps_mor_non_partition = [
@@ -84,7 +82,6 @@ suite("test_hudi_timetravel", 
"p2,external,hudi,external_remote,external_remote_
         "20241114152028770",
         "20241114152030746",
     ]
-    test_hudi_timetravel_querys("user_activity_log_mor_non_partition", 
timestamps_mor_non_partition)
 
     // spark-sql "select distinct _hoodie_commit_time from 
user_activity_log_mor_partition order by _hoodie_commit_time;"
     def timestamps_mor_partition = [
@@ -99,7 +96,17 @@ suite("test_hudi_timetravel", 
"p2,external,hudi,external_remote,external_remote_
         "20241114152323587",
         "20241114152334111",
     ]
+
+    test_hudi_timetravel_querys("user_activity_log_cow_non_partition", 
timestamps_cow_non_partition)
+    test_hudi_timetravel_querys("user_activity_log_cow_partition", 
timestamps_cow_partition)
+    test_hudi_timetravel_querys("user_activity_log_mor_non_partition", 
timestamps_mor_non_partition)
+    test_hudi_timetravel_querys("user_activity_log_mor_partition", 
timestamps_mor_partition)
+    sql """set force_jni_scanner=true;"""
+    test_hudi_timetravel_querys("user_activity_log_cow_non_partition", 
timestamps_cow_non_partition)
+    test_hudi_timetravel_querys("user_activity_log_cow_partition", 
timestamps_cow_partition)
+    test_hudi_timetravel_querys("user_activity_log_mor_non_partition", 
timestamps_mor_non_partition)
     test_hudi_timetravel_querys("user_activity_log_mor_partition", 
timestamps_mor_partition)
+    sql """set force_jni_scanner=false;"""
 
     sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to