This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dcca9db48c [feat](paimon) integrate paimon-cpp reader (#60676)
7dcca9db48c is described below

commit 7dcca9db48c6b8b056c652afc84adeee920816c6
Author: Chenjunwei <[email protected]>
AuthorDate: Sat Feb 21 10:17:06 2026 +0800

    [feat](paimon) integrate paimon-cpp reader (#60676)
    
    Issue Number: #56005
    
    Co-authored-by: morningman <[email protected]>
---
 be/CMakeLists.txt                                  | 124 ++++
 be/cmake/thirdparty.cmake                          |  15 +
 be/src/vec/exec/format/table/paimon_cpp_reader.cpp | 319 ++++++++++
 be/src/vec/exec/format/table/paimon_cpp_reader.h   |  95 +++
 .../exec/format/table/paimon_doris_file_system.cpp | 664 +++++++++++++++++++++
 .../exec/format/table/paimon_doris_file_system.h   |  25 +
 .../format/table/paimon_predicate_converter.cpp    | 659 ++++++++++++++++++++
 .../exec/format/table/paimon_predicate_converter.h |  90 +++
 be/src/vec/exec/scan/file_scanner.cpp              |  29 +-
 .../exec/format/table/paimon_cpp_reader_test.cpp   |  95 +++
 .../java/org/apache/doris/paimon/PaimonUtils.java  |  14 +-
 .../apache/doris/datasource/paimon/PaimonUtil.java |  20 +
 .../datasource/paimon/source/PaimonScanNode.java   |  14 +-
 .../datasource/paimon/source/PaimonSource.java     |   9 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  25 +-
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 16 files changed, 2185 insertions(+), 14 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index fb0de111359..924a88706bb 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -150,6 +150,17 @@ message(STATUS "build task executor simulator: 
${BUILD_TASK_EXECUTOR_SIMULATOR}"
 option(BUILD_FILE_CACHE_LRU_TOOL "ON for building file cache lru tool or OFF 
for not" OFF)
 message(STATUS "build file cache lru tool: ${BUILD_FILE_CACHE_LRU_TOOL}")
 
+option(ENABLE_PAIMON_CPP "Enable Paimon C++ integration" ON)
+set(PAIMON_HOME "" CACHE PATH "Paimon install prefix")
+
+# Allow env to override when reconfiguring (avoid picking /usr/local).
+if (DEFINED ENV{ENABLE_PAIMON_CPP})
+    set(ENABLE_PAIMON_CPP "$ENV{ENABLE_PAIMON_CPP}" CACHE BOOL "" FORCE)
+endif()
+if (DEFINED ENV{PAIMON_HOME} AND NOT PAIMON_HOME)
+    set(PAIMON_HOME "$ENV{PAIMON_HOME}" CACHE PATH "" FORCE)
+endif()
+
 set(CMAKE_SKIP_RPATH TRUE)
 set(Boost_USE_STATIC_LIBS ON)
 set(Boost_USE_STATIC_RUNTIME ON)
@@ -550,6 +561,10 @@ set(COMMON_THIRDPARTY
     ${COMMON_THIRDPARTY}
 )
 
+if (ENABLE_PAIMON_CPP)
+    message(STATUS "Paimon C++ enabled: legacy thirdparty static linkage mode")
+endif()
+
 if ((ARCH_AMD64 OR ARCH_AARCH64) AND OS_LINUX)
     add_library(hadoop_hdfs STATIC IMPORTED)
     set_target_properties(hadoop_hdfs PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/hadoop_hdfs_3_4/native/libhdfs.a)
@@ -577,6 +592,13 @@ endif()
 if (absl_FOUND)
     set(COMMON_THIRDPARTY
         ${COMMON_THIRDPARTY}
+        absl::cord
+        absl::cord_internal
+        absl::cordz_functions
+        absl::cordz_info
+        absl::cordz_update_scope
+        absl::cordz_update_tracker
+        absl::crc_cord_state
         absl::flags
         absl::random_random
         absl::spinlock_wait
@@ -602,6 +624,80 @@ if (BUILD_BENCHMARK)
     )
 endif()
 
+set(PAIMON_FACTORY_REGISTRY_LIBS)
+set(PAIMON_ARROW_CORE_LIB)
+set(PAIMON_ARROW_FILESYSTEM_LIB)
+set(PAIMON_ARROW_DATASET_LIB)
+set(PAIMON_ARROW_ACERO_LIB)
+if (ENABLE_PAIMON_CPP)
+    set(_paimon_arrow_core_candidates
+        ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow.a
+        ${THIRDPARTY_DIR}/lib64/libarrow.a
+        ${THIRDPARTY_DIR}/lib/libarrow.a
+    )
+    foreach(_paimon_arrow_core_candidate IN LISTS 
_paimon_arrow_core_candidates)
+        if (EXISTS "${_paimon_arrow_core_candidate}")
+            add_library(paimon_arrow_core STATIC IMPORTED)
+            set_target_properties(paimon_arrow_core PROPERTIES
+                IMPORTED_LOCATION ${_paimon_arrow_core_candidate})
+            set(PAIMON_ARROW_CORE_LIB paimon_arrow_core)
+            break()
+        endif()
+    endforeach()
+    set(_paimon_arrow_filesystem_candidates
+        ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_filesystem.a
+        ${THIRDPARTY_DIR}/lib64/libarrow_filesystem.a
+        ${THIRDPARTY_DIR}/lib/libarrow_filesystem.a
+    )
+    foreach(_paimon_arrow_filesystem_candidate IN LISTS 
_paimon_arrow_filesystem_candidates)
+        if (EXISTS "${_paimon_arrow_filesystem_candidate}")
+            add_library(paimon_arrow_filesystem STATIC IMPORTED)
+            set_target_properties(paimon_arrow_filesystem PROPERTIES
+                IMPORTED_LOCATION ${_paimon_arrow_filesystem_candidate})
+            set(PAIMON_ARROW_FILESYSTEM_LIB paimon_arrow_filesystem)
+            break()
+        endif()
+    endforeach()
+    set(_paimon_arrow_dataset_candidates
+        ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_dataset.a
+        ${THIRDPARTY_DIR}/lib64/libarrow_dataset.a
+        ${THIRDPARTY_DIR}/lib/libarrow_dataset.a
+    )
+    foreach(_paimon_arrow_dataset_candidate IN LISTS 
_paimon_arrow_dataset_candidates)
+        if (EXISTS "${_paimon_arrow_dataset_candidate}")
+            add_library(paimon_arrow_dataset STATIC IMPORTED)
+            set_target_properties(paimon_arrow_dataset PROPERTIES
+                IMPORTED_LOCATION ${_paimon_arrow_dataset_candidate})
+            set(PAIMON_ARROW_DATASET_LIB paimon_arrow_dataset)
+            break()
+        endif()
+    endforeach()
+    set(_paimon_arrow_acero_candidates
+        ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_acero.a
+        ${THIRDPARTY_DIR}/lib64/libarrow_acero.a
+        ${THIRDPARTY_DIR}/lib/libarrow_acero.a
+    )
+    foreach(_paimon_arrow_acero_candidate IN LISTS 
_paimon_arrow_acero_candidates)
+        if (EXISTS "${_paimon_arrow_acero_candidate}")
+            add_library(paimon_arrow_acero STATIC IMPORTED)
+            set_target_properties(paimon_arrow_acero PROPERTIES
+                IMPORTED_LOCATION ${_paimon_arrow_acero_candidate})
+            set(PAIMON_ARROW_ACERO_LIB paimon_arrow_acero)
+            break()
+        endif()
+    endforeach()
+    if (PAIMON_ARROW_DATASET_LIB)
+        # paimon_parquet_file_format depends on Arrow Dataset symbols.
+        # Force-link it only when arrow_dataset is available.
+        set(PAIMON_FACTORY_REGISTRY_LIBS
+            paimon_parquet_file_format
+        )
+        list(REMOVE_ITEM COMMON_THIRDPARTY ${PAIMON_FACTORY_REGISTRY_LIBS})
+    else()
+        message(STATUS "Paimon C++: libarrow_dataset.a not found, keep 
paimon_parquet_file_format as regular static lib")
+    endif()
+endif()
+
 set(DORIS_DEPENDENCIES
     ${DORIS_DEPENDENCIES}
     ${WL_START_GROUP}
@@ -619,6 +715,34 @@ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} 
clucene-core-static)
 set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} clucene-shared-static)
 set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} clucene-contribs-lib)
 
+if (ENABLE_PAIMON_CPP)
+    if (PAIMON_FACTORY_REGISTRY_LIBS)
+        set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES}
+            -Wl,--whole-archive
+            ${PAIMON_FACTORY_REGISTRY_LIBS}
+            -Wl,--no-whole-archive)
+    endif()
+    if (PAIMON_ARROW_CORE_LIB)
+        set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${PAIMON_ARROW_CORE_LIB})
+    endif()
+    if (PAIMON_ARROW_FILESYSTEM_LIB)
+        set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} 
${PAIMON_ARROW_FILESYSTEM_LIB})
+    endif()
+    if (PAIMON_ARROW_DATASET_LIB)
+        set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} 
${PAIMON_ARROW_DATASET_LIB})
+    endif()
+    if (PAIMON_ARROW_ACERO_LIB)
+        set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${PAIMON_ARROW_ACERO_LIB})
+    endif()
+
+    # paimon-cpp internal dependencies (renamed with _paimon suffix)
+    # These must come after paimon libraries to resolve symbols.
+    set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} roaring_bitmap_paimon)
+    set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} xxhash_paimon)
+    set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} fmt_paimon)
+    set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} tbb_paimon)
+endif()
+
 set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${WL_END_GROUP})
 
 # Add all external dependencies. They should come after the palo libs.
diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 9bb7b8ba748..441ebe8dc73 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -179,3 +179,18 @@ add_thirdparty(icudata LIB64)
 
 
 add_thirdparty(pugixml LIB64)
+
+if (ENABLE_PAIMON_CPP)
+    add_thirdparty(paimon LIB64)
+    add_thirdparty(paimon_parquet_file_format LIB64)
+    add_thirdparty(paimon_orc_file_format LIB64)
+    add_thirdparty(paimon_blob_file_format LIB64)
+    add_thirdparty(paimon_local_file_system LIB64)
+    add_thirdparty(paimon_file_index LIB64)
+    add_thirdparty(paimon_global_index LIB64)
+
+    add_thirdparty(roaring_bitmap_paimon LIB64)
+    add_thirdparty(xxhash_paimon LIB64)
+    add_thirdparty(fmt_paimon LIB64)
+    add_thirdparty(tbb_paimon LIB64)
+endif()
diff --git a/be/src/vec/exec/format/table/paimon_cpp_reader.cpp 
b/be/src/vec/exec/format/table/paimon_cpp_reader.cpp
new file mode 100644
index 00000000000..0bb89d0c73e
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_cpp_reader.cpp
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "paimon_cpp_reader.h"
+
+#include <algorithm>
+#include <mutex>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/read_context.h"
+#include "paimon/table/source/table_read.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "util/url_coding.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exec/format/table/paimon_doris_file_system.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+namespace {
+constexpr const char* VALUE_KIND_FIELD = "_VALUE_KIND";
+
+} // namespace
+
+PaimonCppReader::PaimonCppReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                 RuntimeState* state, RuntimeProfile* profile,
+                                 const TFileRangeDesc& range,
+                                 const TFileScanRangeParams* range_params)
+        : _file_slot_descs(file_slot_descs),
+          _state(state),
+          _profile(profile),
+          _range(range),
+          _range_params(range_params) {
+    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, 
_ctzz);
+    if (range.__isset.table_format_params &&
+        range.table_format_params.__isset.table_level_row_count) {
+        _remaining_table_level_row_count = 
range.table_format_params.table_level_row_count;
+    } else {
+        _remaining_table_level_row_count = -1;
+    }
+}
+
+PaimonCppReader::~PaimonCppReader() = default;
+
+Status PaimonCppReader::init_reader() {
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count >= 0) {
+        return Status::OK();
+    }
+    return _init_paimon_reader();
+}
+
+Status PaimonCppReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count >= 0) {
+        auto rows = std::min(_remaining_table_level_row_count,
+                             (int64_t)_state->query_options().batch_size);
+        _remaining_table_level_row_count -= rows;
+        auto mutate_columns = block->mutate_columns();
+        for (auto& col : mutate_columns) {
+            col->resize(rows);
+        }
+        block->set_columns(std::move(mutate_columns));
+        *read_rows = rows;
+        *eof = false;
+        if (_remaining_table_level_row_count == 0) {
+            *eof = true;
+        }
+        return Status::OK();
+    }
+
+    if (!_batch_reader) {
+        return Status::InternalError("paimon-cpp reader is not initialized");
+    }
+
+    if (_col_name_to_block_idx.empty()) {
+        _col_name_to_block_idx = block->get_name_to_pos_map();
+    }
+
+    auto batch_result = _batch_reader->NextBatch();
+    if (!batch_result.ok()) {
+        return Status::InternalError("paimon-cpp read batch failed: {}",
+                                     batch_result.status().ToString());
+    }
+    auto batch = std::move(batch_result).value();
+    if (paimon::BatchReader::IsEofBatch(batch)) {
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
+    }
+
+    arrow::Result<std::shared_ptr<arrow::RecordBatch>> import_result =
+            arrow::ImportRecordBatch(batch.first.get(), batch.second.get());
+    if (!import_result.ok()) {
+        return Status::InternalError("failed to import paimon-cpp arrow batch: 
{}",
+                                     import_result.status().message());
+    }
+
+    auto record_batch = std::move(import_result).ValueUnsafe();
+    const auto num_rows = static_cast<size_t>(record_batch->num_rows());
+    const auto num_columns = record_batch->num_columns();
+    for (int c = 0; c < num_columns; ++c) {
+        const auto& field = record_batch->schema()->field(c);
+        if (field->name() == VALUE_KIND_FIELD) {
+            continue;
+        }
+
+        auto it = _col_name_to_block_idx.find(field->name());
+        if (it == _col_name_to_block_idx.end()) {
+            // Skip columns that are not in the block (e.g., partition columns 
handled elsewhere)
+            continue;
+        }
+        const vectorized::ColumnWithTypeAndName& column_with_name =
+                block->get_by_position(it->second);
+        try {
+            
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
+                    column_with_name.column->assume_mutable_ref(), 
record_batch->column(c).get(), 0,
+                    num_rows, _ctzz));
+        } catch (Exception& e) {
+            return Status::InternalError("Failed to convert from arrow to 
block: {}", e.what());
+        }
+    }
+
+    *read_rows = num_rows;
+    *eof = false;
+    return Status::OK();
+}
+
+Status PaimonCppReader::get_columns(std::unordered_map<std::string, 
DataTypePtr>* name_to_type,
+                                    std::unordered_set<std::string>* 
missing_cols) {
+    for (const auto& slot : _file_slot_descs) {
+        name_to_type->emplace(slot->col_name(), slot->type());
+    }
+    return Status::OK();
+}
+
+Status PaimonCppReader::close() {
+    if (_batch_reader) {
+        _batch_reader->Close();
+    }
+    return Status::OK();
+}
+
+Status PaimonCppReader::_init_paimon_reader() {
+    register_paimon_doris_file_system();
+    RETURN_IF_ERROR(_decode_split(&_split));
+
+    auto table_path_opt = _resolve_table_path();
+    if (!table_path_opt.has_value()) {
+        return Status::InternalError(
+                "paimon-cpp missing paimon_table; cannot resolve paimon table 
root path");
+    }
+    auto options = _build_options();
+    auto read_columns = _build_read_columns();
+
+    // Avoid moving strings across module boundaries to prevent allocator 
mismatches in ASAN builds.
+    std::string table_path = table_path_opt.value();
+    static std::once_flag options_log_once;
+    std::call_once(options_log_once, [&]() {
+        auto has_key = [&](const char* key) {
+            auto it = options.find(key);
+            return (it != options.end() && !it->second.empty()) ? "set" : 
"empty";
+        };
+        auto value_or = [&](const char* key) {
+            auto it = options.find(key);
+            return it != options.end() ? it->second : std::string("<unset>");
+        };
+        LOG(INFO) << "paimon-cpp options summary: table_path=" << table_path
+                  << " AWS_ACCESS_KEY=" << has_key("AWS_ACCESS_KEY")
+                  << " AWS_SECRET_KEY=" << has_key("AWS_SECRET_KEY")
+                  << " AWS_TOKEN=" << has_key("AWS_TOKEN")
+                  << " AWS_ENDPOINT=" << value_or("AWS_ENDPOINT")
+                  << " AWS_REGION=" << value_or("AWS_REGION")
+                  << " use_path_style=" << value_or("use_path_style")
+                  << " fs.oss.endpoint=" << value_or("fs.oss.endpoint")
+                  << " fs.s3a.endpoint=" << value_or("fs.s3a.endpoint");
+    });
+    paimon::ReadContextBuilder builder(table_path);
+    if (!read_columns.empty()) {
+        builder.SetReadSchema(read_columns);
+    }
+    if (!options.empty()) {
+        builder.SetOptions(options);
+    }
+    if (_predicate) {
+        builder.SetPredicate(_predicate);
+        builder.EnablePredicateFilter(true);
+    }
+
+    auto context_result = builder.Finish();
+    if (!context_result.ok()) {
+        return Status::InternalError("paimon-cpp build read context failed: 
{}",
+                                     context_result.status().ToString());
+    }
+    auto context = std::move(context_result).value();
+
+    auto table_read_result = paimon::TableRead::Create(std::move(context));
+    if (!table_read_result.ok()) {
+        return Status::InternalError("paimon-cpp create table read failed: {}",
+                                     table_read_result.status().ToString());
+    }
+    auto table_read = std::move(table_read_result).value();
+    auto reader_result = table_read->CreateReader(_split);
+    if (!reader_result.ok()) {
+        return Status::InternalError("paimon-cpp create reader failed: {}",
+                                     reader_result.status().ToString());
+    }
+    _table_read = std::move(table_read);
+    _batch_reader = std::move(reader_result).value();
+    return Status::OK();
+}
+
+Status PaimonCppReader::_decode_split(std::shared_ptr<paimon::Split>* split) {
+    if (!_range.__isset.table_format_params || 
!_range.table_format_params.__isset.paimon_params ||
+        !_range.table_format_params.paimon_params.__isset.paimon_split) {
+        return Status::InternalError("paimon-cpp missing paimon_split in scan 
range");
+    }
+    const auto& encoded_split = 
_range.table_format_params.paimon_params.paimon_split;
+    std::string decoded_split;
+    if (!base64_decode(encoded_split, &decoded_split)) {
+        return Status::InternalError("paimon-cpp base64 decode paimon_split 
failed");
+    }
+    auto pool = paimon::GetDefaultPool();
+    auto split_result =
+            paimon::Split::Deserialize(decoded_split.data(), 
decoded_split.size(), pool);
+    if (!split_result.ok()) {
+        return Status::InternalError("paimon-cpp deserialize split failed: {}",
+                                     split_result.status().ToString());
+    }
+    *split = std::move(split_result).value();
+    return Status::OK();
+}
+
+std::optional<std::string> PaimonCppReader::_resolve_table_path() const {
+    if (_range.__isset.table_format_params && 
_range.table_format_params.__isset.paimon_params &&
+        _range.table_format_params.paimon_params.__isset.paimon_table &&
+        !_range.table_format_params.paimon_params.paimon_table.empty()) {
+        return _range.table_format_params.paimon_params.paimon_table;
+    }
+    return std::nullopt;
+}
+
+std::vector<std::string> PaimonCppReader::_build_read_columns() const {
+    std::vector<std::string> columns;
+    columns.reserve(_file_slot_descs.size());
+    for (const auto& slot : _file_slot_descs) {
+        columns.emplace_back(slot->col_name());
+    }
+    return columns;
+}
+
+std::map<std::string, std::string> PaimonCppReader::_build_options() const {
+    std::map<std::string, std::string> options;
+    if (_range.__isset.table_format_params && 
_range.table_format_params.__isset.paimon_params &&
+        _range.table_format_params.paimon_params.__isset.paimon_options) {
+        
options.insert(_range.table_format_params.paimon_params.paimon_options.begin(),
+                       
_range.table_format_params.paimon_params.paimon_options.end());
+    }
+
+    if (_range_params && _range_params->__isset.properties && 
!_range_params->properties.empty()) {
+        for (const auto& kv : _range_params->properties) {
+            options[kv.first] = kv.second;
+        }
+    } else if (_range.__isset.table_format_params &&
+               _range.table_format_params.__isset.paimon_params &&
+               _range.table_format_params.paimon_params.__isset.hadoop_conf) {
+        for (const auto& kv : 
_range.table_format_params.paimon_params.hadoop_conf) {
+            options[kv.first] = kv.second;
+        }
+    }
+
+    auto copy_if_missing = [&](const char* from_key, const char* to_key) {
+        if (options.find(to_key) != options.end()) {
+            return;
+        }
+        auto it = options.find(from_key);
+        if (it != options.end() && !it->second.empty()) {
+            options[to_key] = it->second;
+        }
+    };
+
+    // Map common OSS/S3 Hadoop configs to Doris S3 property keys.
+    copy_if_missing("fs.oss.accessKeyId", "AWS_ACCESS_KEY");
+    copy_if_missing("fs.oss.accessKeySecret", "AWS_SECRET_KEY");
+    copy_if_missing("fs.oss.sessionToken", "AWS_TOKEN");
+    copy_if_missing("fs.oss.endpoint", "AWS_ENDPOINT");
+    copy_if_missing("fs.oss.region", "AWS_REGION");
+    copy_if_missing("fs.s3a.access.key", "AWS_ACCESS_KEY");
+    copy_if_missing("fs.s3a.secret.key", "AWS_SECRET_KEY");
+    copy_if_missing("fs.s3a.session.token", "AWS_TOKEN");
+    copy_if_missing("fs.s3a.endpoint", "AWS_ENDPOINT");
+    copy_if_missing("fs.s3a.region", "AWS_REGION");
+    copy_if_missing("fs.s3a.path.style.access", "use_path_style");
+
+    options[paimon::Options::FILE_SYSTEM] = "doris";
+    return options;
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_cpp_reader.h 
b/be/src/vec/exec/format/table/paimon_cpp_reader.h
new file mode 100644
index 00000000000..a1e8cfc1917
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_cpp_reader.h
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "cctz/time_zone.h"
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/table/source/split.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace paimon {
+class TableRead;
+class Predicate;
+} // namespace paimon
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+class Block;
+
+class PaimonCppReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(PaimonCppReader);
+
+public:
+    PaimonCppReader(const std::vector<SlotDescriptor*>& file_slot_descs, 
RuntimeState* state,
+                    RuntimeProfile* profile, const TFileRangeDesc& range,
+                    const TFileScanRangeParams* range_params);
+    ~PaimonCppReader() override;
+
+    Status init_reader();
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+    Status get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+    Status close() override;
+    void set_predicate(std::shared_ptr<paimon::Predicate> predicate) {
+        _predicate = std::move(predicate);
+    }
+
+private:
+    Status _init_paimon_reader();
+    Status _decode_split(std::shared_ptr<paimon::Split>* split);
+    // Resolve paimon table root path for schema/manifest lookup.
+    std::optional<std::string> _resolve_table_path() const;
+    std::vector<std::string> _build_read_columns() const;
+    std::map<std::string, std::string> _build_options() const;
+
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+    RuntimeState* _state = nullptr;
+    [[maybe_unused]] RuntimeProfile* _profile = nullptr;
+    const TFileRangeDesc& _range;
+    const TFileScanRangeParams* _range_params = nullptr;
+
+    std::shared_ptr<paimon::Split> _split;
+    std::unique_ptr<paimon::TableRead> _table_read;
+    std::unique_ptr<paimon::BatchReader> _batch_reader;
+    std::shared_ptr<paimon::Predicate> _predicate;
+
+    std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
+    int64_t _remaining_table_level_row_count = -1;
+    cctz::time_zone _ctzz;
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_doris_file_system.cpp 
b/be/src/vec/exec/format/table/paimon_doris_file_system.cpp
new file mode 100644
index 00000000000..abe6fb0c7cb
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_doris_file_system.cpp
@@ -0,0 +1,664 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "paimon_doris_file_system.h"
+
+#include <algorithm>
+#include <cctype>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "gen_cpp/Types_types.h"
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/local_file_system.h"
+#include "paimon/factories/factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+struct ParsedUri {
+    std::string scheme;
+    std::string authority;
+};
+
+std::string to_lower(std::string value) {
+    std::ranges::transform(value, value.begin(),
+                           [](unsigned char c) { return 
static_cast<char>(std::tolower(c)); });
+    return value;
+}
+
+ParsedUri parse_uri(const std::string& path) {
+    ParsedUri parsed;
+    size_t scheme_pos = path.find("://");
+    size_t delim_len = 3;
+    if (scheme_pos == std::string::npos) {
+        scheme_pos = path.find(":/");
+        delim_len = 2;
+    }
+    if (scheme_pos == std::string::npos || scheme_pos == 0) {
+        return parsed;
+    }
+    parsed.scheme = to_lower(path.substr(0, scheme_pos));
+    size_t authority_start = scheme_pos + delim_len;
+    if (authority_start >= path.size() || path[authority_start] == '/') {
+        return parsed;
+    }
+    size_t next_slash = path.find('/', authority_start);
+    if (next_slash == std::string::npos) {
+        parsed.authority = path.substr(authority_start);
+    } else {
+        parsed.authority = path.substr(authority_start, next_slash - 
authority_start);
+    }
+    return parsed;
+}
+
+bool is_s3_scheme(const std::string& scheme) {
+    return scheme == "s3" || scheme == "s3a" || scheme == "s3n" || scheme == 
"oss" ||
+           scheme == "obs" || scheme == "cos" || scheme == "cosn" || scheme == 
"gs" ||
+           scheme == "abfs" || scheme == "abfss" || scheme == "wasb" || scheme 
== "wasbs";
+}
+
+bool is_hdfs_scheme(const std::string& scheme) {
+    return scheme == "hdfs" || scheme == "viewfs" || scheme == "local";
+}
+
+bool is_http_scheme(const std::string& scheme) {
+    return scheme == "http" || scheme == "https";
+}
+
+doris::TFileType::type map_scheme_to_file_type(const std::string& scheme) {
+    if (scheme.empty()) {
+        return doris::TFileType::FILE_HDFS;
+    }
+    if (scheme == "file") {
+        return doris::TFileType::FILE_LOCAL;
+    }
+    if (is_hdfs_scheme(scheme)) {
+        return doris::TFileType::FILE_HDFS;
+    }
+    if (is_s3_scheme(scheme)) {
+        return doris::TFileType::FILE_S3;
+    }
+    if (is_http_scheme(scheme)) {
+        return doris::TFileType::FILE_HTTP;
+    }
+    if (scheme == "ofs" || scheme == "gfs" || scheme == "jfs") {
+        return doris::TFileType::FILE_BROKER;
+    }
+    return doris::TFileType::FILE_HDFS;
+}
+
+std::string replace_scheme(const std::string& path, const std::string& scheme) 
{
+    size_t scheme_pos = path.find("://");
+    size_t delim_len = 3;
+    if (scheme_pos == std::string::npos) {
+        scheme_pos = path.find(":/");
+        delim_len = 2;
+    }
+    if (scheme_pos == std::string::npos) {
+        return path;
+    }
+    return scheme + "://" + path.substr(scheme_pos + delim_len);
+}
+
+std::string normalize_local_path(const std::string& path) {
+    if (!path.starts_with("file:")) {
+        return path;
+    }
+    constexpr size_t file_prefix_len = 5;
+    size_t start = file_prefix_len;
+    if (path.compare(start, 2, "//") == 0 && path.size() - start > 2) {
+        size_t next_slash = path.find('/', start + 2);
+        if (next_slash == std::string::npos) {
+            return "";
+        }
+        start = next_slash;
+    }
+    return path.substr(start);
+}
+
+std::string normalize_path_for_type(const std::string& path, const 
std::string& scheme,
+                                    doris::TFileType::type type) {
+    if (type == doris::TFileType::FILE_LOCAL) {
+        return normalize_local_path(path);
+    }
+    if (type == doris::TFileType::FILE_S3 && scheme != "s3" && 
!is_http_scheme(scheme)) {
+        return replace_scheme(path, "s3");
+    }
+    return path;
+}
+
+std::string build_fs_cache_key(doris::TFileType::type type, const ParsedUri& 
uri,
+                               const std::string& default_fs_name) {
+    switch (type) {
+    case doris::TFileType::FILE_LOCAL:
+        return "local";
+    case doris::TFileType::FILE_S3:
+        return "s3://" + uri.authority;
+    case doris::TFileType::FILE_HTTP:
+        return "http://"; + uri.authority;
+    case doris::TFileType::FILE_BROKER:
+        return "broker";
+    case doris::TFileType::FILE_HDFS:
+    default:
+        if (!uri.scheme.empty() || !uri.authority.empty()) {
+            return uri.scheme + "://" + uri.authority;
+        }
+        return default_fs_name;
+    }
+}
+
+paimon::Status to_paimon_status(const doris::Status& status) {
+    if (status.ok()) {
+        return paimon::Status::OK();
+    }
+    switch (status.code()) {
+    case doris::ErrorCode::NOT_FOUND:
+    case doris::ErrorCode::DIR_NOT_EXIST:
+        return paimon::Status::NotExist(status.to_string());
+    case doris::ErrorCode::ALREADY_EXIST:
+    case doris::ErrorCode::FILE_ALREADY_EXIST:
+        return paimon::Status::Exist(status.to_string());
+    case doris::ErrorCode::INVALID_ARGUMENT:
+    case doris::ErrorCode::INVALID_INPUT_SYNTAX:
+        return paimon::Status::Invalid(status.to_string());
+    case doris::ErrorCode::NOT_IMPLEMENTED_ERROR:
+        return paimon::Status::NotImplemented(status.to_string());
+    default:
+        return paimon::Status::IOError(status.to_string());
+    }
+}
+
+std::string join_path(const std::string& base, const std::string& child) {
+    if (base.empty()) {
+        return child;
+    }
+    if (base.back() == '/') {
+        return base + child;
+    }
+    return base + "/" + child;
+}
+
+std::string parent_path_no_scheme(const std::string& path) {
+    if (path.empty()) {
+        return "";
+    }
+    size_t end = path.size();
+    while (end > 1 && path[end - 1] == '/') {
+        --end;
+    }
+    size_t pos = path.rfind('/', end - 1);
+    if (pos == std::string::npos) {
+        return "";
+    }
+    if (pos == 0) {
+        return "/";
+    }
+    return path.substr(0, pos);
+}
+
+std::string parent_path(const std::string& path) {
+    ParsedUri uri = parse_uri(path);
+    if (uri.scheme.empty()) {
+        return parent_path_no_scheme(path);
+    }
+    size_t scheme_pos = path.find("://");
+    size_t delim_len = 3;
+    if (scheme_pos == std::string::npos) {
+        scheme_pos = path.find(":/");
+        delim_len = 2;
+    }
+    if (scheme_pos == std::string::npos) {
+        return parent_path_no_scheme(path);
+    }
+    size_t start = scheme_pos + delim_len;
+    size_t slash = path.find('/', start);
+    if (slash == std::string::npos) {
+        return "";
+    }
+    std::string path_part = path.substr(slash);
+    std::string parent_part = parent_path_no_scheme(path_part);
+    if (parent_part.empty()) {
+        return "";
+    }
+    std::string prefix = uri.scheme + "://";
+    if (!uri.authority.empty()) {
+        prefix += uri.authority;
+    }
+    return prefix + parent_part;
+}
+
+class DorisInputStream : public InputStream {
+public:
+    DorisInputStream(doris::io::FileReaderSPtr reader, std::string path)
+            : reader_(std::move(reader)), path_(std::move(path)) {}
+
+    Status Seek(int64_t offset, SeekOrigin origin) override {
+        int64_t target = 0;
+        if (origin == SeekOrigin::FS_SEEK_SET) {
+            target = offset;
+        } else if (origin == SeekOrigin::FS_SEEK_CUR) {
+            target = position_ + offset;
+        } else if (origin == SeekOrigin::FS_SEEK_END) {
+            target = static_cast<int64_t>(reader_->size()) + offset;
+        } else {
+            return Status::Invalid("unknown seek origin");
+        }
+        if (target < 0) {
+            return Status::Invalid("seek position is negative");
+        }
+        position_ = target;
+        return Status::OK();
+    }
+
+    Result<int64_t> GetPos() const override { return position_; }
+
+    Result<int32_t> Read(char* buffer, uint32_t size) override {
+        size_t bytes_read = 0;
+        doris::Status status = reader_->read_at(position_, 
doris::Slice(buffer, size), &bytes_read);
+        if (!status.ok()) {
+            return to_paimon_status(status);
+        }
+        position_ += static_cast<int64_t>(bytes_read);
+        return static_cast<int32_t>(bytes_read);
+    }
+
+    Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) 
override {
+        size_t bytes_read = 0;
+        doris::Status status = reader_->read_at(offset, doris::Slice(buffer, 
size), &bytes_read);
+        if (!status.ok()) {
+            return to_paimon_status(status);
+        }
+        return static_cast<int32_t>(bytes_read);
+    }
+
+    void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                   std::function<void(Status)>&& callback) override {
+        Result<int32_t> result = Read(buffer, size, offset);
+        Status status = Status::OK();
+        if (!result.ok()) {
+            status = result.status();
+        }
+        callback(status);
+    }
+
+    Result<std::string> GetUri() const override { return path_; }
+
+    Result<uint64_t> Length() const override { return 
static_cast<uint64_t>(reader_->size()); }
+
+    Status Close() override { return to_paimon_status(reader_->close()); }
+
+private:
+    doris::io::FileReaderSPtr reader_;
+    std::string path_;
+    int64_t position_ = 0;
+};
+
+class DorisOutputStream : public OutputStream {
+public:
+    DorisOutputStream(doris::io::FileWriterPtr writer, std::string path)
+            : writer_(std::move(writer)), path_(std::move(path)) {}
+
+    Result<int32_t> Write(const char* buffer, uint32_t size) override {
+        doris::Status status = writer_->append(doris::Slice(buffer, size));
+        if (!status.ok()) {
+            return to_paimon_status(status);
+        }
+        return static_cast<int32_t>(size);
+    }
+
+    Status Flush() override { return Status::OK(); }
+
+    Result<int64_t> GetPos() const override {
+        return static_cast<int64_t>(writer_->bytes_appended());
+    }
+
+    Result<std::string> GetUri() const override { return path_; }
+
+    Status Close() override { return to_paimon_status(writer_->close()); }
+
+private:
+    doris::io::FileWriterPtr writer_;
+    std::string path_;
+};
+
+class DorisBasicFileStatus : public BasicFileStatus {
+public:
+    DorisBasicFileStatus(std::string path, bool is_dir) : 
path_(std::move(path)), is_dir_(is_dir) {}
+
+    bool IsDir() const override { return is_dir_; }
+    std::string GetPath() const override { return path_; }
+
+private:
+    std::string path_;
+    bool is_dir_;
+};
+
+class DorisFileStatus : public FileStatus {
+public:
+    DorisFileStatus(std::string path, bool is_dir, uint64_t length, int64_t 
mtime)
+            : path_(std::move(path)), is_dir_(is_dir), length_(length), 
mtime_(mtime) {}
+
+    uint64_t GetLen() const override { return length_; }
+    bool IsDir() const override { return is_dir_; }
+    std::string GetPath() const override { return path_; }
+    int64_t GetModificationTime() const override { return mtime_; }
+
+private:
+    std::string path_;
+    bool is_dir_;
+    uint64_t length_;
+    int64_t mtime_;
+};
+
+class DorisFileSystem : public FileSystem {
+public:
+    explicit DorisFileSystem(std::map<std::string, std::string> options)
+            : options_(std::move(options)) {
+        auto it = options_.find("fs.defaultFS");
+        if (it != options_.end()) {
+            default_fs_name_ = it->second;
+        }
+    }
+
+    Result<std::unique_ptr<InputStream>> Open(const std::string& path) const 
override {
+        PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+        auto& fs = resolved.first;
+        auto& normalized_path = resolved.second;
+        doris::io::FileReaderSPtr reader;
+        doris::io::FileReaderOptions reader_options = 
doris::io::FileReaderOptions::DEFAULT;
+        doris::Status status = fs->open_file(normalized_path, &reader, 
&reader_options);
+        if (!status.ok()) {
+            return to_paimon_status(status);
+        }
+        return std::make_unique<DorisInputStream>(std::move(reader), 
normalized_path);
+    }
+
+    Result<std::unique_ptr<OutputStream>> Create(const std::string& path,
+                                                 bool overwrite) const 
override {
+        PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+        auto& fs = resolved.first;
+        auto& normalized_path = resolved.second;
+        if (!overwrite) {
+            bool exists = false;
+            doris::Status exists_status = fs->exists(normalized_path, &exists);
+            if (!exists_status.ok()) {
+                return to_paimon_status(exists_status);
+            }
+            if (exists) {
+                return Status::Exist("file already exists: ", normalized_path);
+            }
+        }
+        std::string parent = parent_path(normalized_path);
+        if (!parent.empty()) {
+            doris::Status mkdir_status = fs->create_directory(parent);
+            if (!mkdir_status.ok()) {
+                return to_paimon_status(mkdir_status);
+            }
+        }
+        doris::io::FileWriterPtr writer;
+        doris::Status status = fs->create_file(normalized_path, &writer);
+        if (!status.ok()) {
+            return to_paimon_status(status);
+        }
+        return std::make_unique<DorisOutputStream>(std::move(writer), 
normalized_path);
+    }
+
+    Status Mkdirs(const std::string& path) const override {
+        PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+        doris::Status status = 
resolved.first->create_directory(resolved.second);
+        return to_paimon_status(status);
+    }
+
+    Status Rename(const std::string& src, const std::string& dst) const 
override {
+        PAIMON_ASSIGN_OR_RAISE(auto src_resolved, resolve_path(src));
+        PAIMON_ASSIGN_OR_RAISE(auto dst_resolved, resolve_path(dst));
+        doris::Status status = src_resolved.first->rename(src_resolved.second, 
dst_resolved.second);
+        return to_paimon_status(status);
+    }
+
+    Status Delete(const std::string& path, bool recursive = true) const 
override {
+        PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+        bool exists = false;
+        doris::Status exists_status = resolved.first->exists(resolved.second, 
&exists);
+        if (!exists_status.ok()) {
+            return to_paimon_status(exists_status);
+        }
+        if (!exists) {
+            return Status::OK();
+        }
+        int64_t size = 0;
+        doris::Status size_status = resolved.first->file_size(resolved.second, 
&size);
+        if (size_status.ok()) {
+            return 
to_paimon_status(resolved.first->delete_file(resolved.second));
+        }
+        if (recursive) {
+            return 
to_paimon_status(resolved.first->delete_directory(resolved.second));
+        }
+        return to_paimon_status(size_status);
+    }
+
+    Result<std::unique_ptr<FileStatus>> GetFileStatus(const std::string& path) 
const override {
+        ParsedUri uri = parse_uri(path);
+        doris::TFileType::type type = map_scheme_to_file_type(uri.scheme);
+        PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+        bool exists = false;
+        doris::Status exists_status = resolved.first->exists(resolved.second, 
&exists);
+        if (!exists_status.ok()) {
+            return to_paimon_status(exists_status);
+        }
+        if (!exists) {
+            if (type != doris::TFileType::FILE_S3) {
+                return Status::NotExist("path not exists: ", resolved.second);
+            }
+            std::vector<doris::io::FileInfo> files;
+            bool list_exists = false;
+            doris::Status list_status =
+                    resolved.first->list(resolved.second, false, &files, 
&list_exists);
+            if (!list_status.ok()) {
+                return to_paimon_status(list_status);
+            }
+            if (!list_exists && files.empty()) {
+                return Status::NotExist("path not exists: ", resolved.second);
+            }
+            return std::make_unique<DorisFileStatus>(resolved.second, true, 0, 
0);
+        }
+        int64_t size = 0;
+        doris::Status size_status = resolved.first->file_size(resolved.second, 
&size);
+        if (size_status.ok()) {
+            return std::make_unique<DorisFileStatus>(resolved.second, false,
+                                                     
static_cast<uint64_t>(size), 0);
+        }
+        std::vector<doris::io::FileInfo> files;
+        bool list_exists = false;
+        doris::Status list_status =
+                resolved.first->list(resolved.second, false, &files, 
&list_exists);
+        if (!list_status.ok()) {
+            return to_paimon_status(list_status);
+        }
+        if (!list_exists && files.empty()) {
+            return Status::NotExist("path not exists: ", resolved.second);
+        }
+        return std::make_unique<DorisFileStatus>(resolved.second, true, 0, 0);
+    }
+
+    Status ListDir(const std::string& directory,
+                   std::vector<std::unique_ptr<BasicFileStatus>>* status_list) 
const override {
+        PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(directory));
+        auto file_status = GetFileStatus(directory);
+        if (file_status.ok() && !file_status.value()->IsDir()) {
+            return Status::IOError("path is not a directory: ", directory);
+        }
+        std::vector<doris::io::FileInfo> files;
+        bool exists = false;
+        doris::Status status = resolved.first->list(resolved.second, false, 
&files, &exists);
+        if (!status.ok()) {
+            return to_paimon_status(status);
+        }
+        if (!exists) {
+            return Status::OK();
+        }
+        status_list->reserve(status_list->size() + files.size());
+        for (const auto& file : files) {
+            status_list->emplace_back(std::make_unique<DorisBasicFileStatus>(
+                    join_path(resolved.second, file.file_name), 
!file.is_file));
+        }
+        return Status::OK();
+    }
+
+    Status ListFileStatus(const std::string& path,
+                          std::vector<std::unique_ptr<FileStatus>>* 
status_list) const override {
+        PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+        auto self_status = GetFileStatus(path);
+        if (!self_status.ok()) {
+            if (self_status.status().IsNotExist()) {
+                return Status::OK();
+            }
+            return self_status.status();
+        }
+        if (!self_status.value()->IsDir()) {
+            status_list->emplace_back(std::move(self_status).value());
+            return Status::OK();
+        }
+        std::vector<doris::io::FileInfo> files;
+        bool exists = false;
+        doris::Status list_status = resolved.first->list(resolved.second, 
false, &files, &exists);
+        if (!list_status.ok()) {
+            return to_paimon_status(list_status);
+        }
+        if (!exists) {
+            return Status::OK();
+        }
+        status_list->reserve(status_list->size() + files.size());
+        for (const auto& file : files) {
+            uint64_t length = file.is_file ? 
static_cast<uint64_t>(file.file_size) : 0;
+            status_list->emplace_back(std::make_unique<DorisFileStatus>(
+                    join_path(resolved.second, file.file_name), !file.is_file, 
length, 0));
+        }
+        return Status::OK();
+    }
+
+    Result<bool> Exists(const std::string& path) const override {
+        ParsedUri uri = parse_uri(path);
+        doris::TFileType::type type = map_scheme_to_file_type(uri.scheme);
+        PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+        bool exists = false;
+        doris::Status status = resolved.first->exists(resolved.second, 
&exists);
+        if (!status.ok()) {
+            return to_paimon_status(status);
+        }
+        if (!exists && type == doris::TFileType::FILE_S3) {
+            std::vector<doris::io::FileInfo> files;
+            bool list_exists = false;
+            doris::Status list_status =
+                    resolved.first->list(resolved.second, false, &files, 
&list_exists);
+            if (!list_status.ok()) {
+                return to_paimon_status(list_status);
+            }
+            return list_exists || !files.empty();
+        }
+        return exists;
+    }
+
+private:
+    Result<std::pair<doris::io::FileSystemSPtr, std::string>> resolve_path(
+            const std::string& path) const {
+        auto uri = parse_uri(path);
+        doris::TFileType::type type = map_scheme_to_file_type(uri.scheme);
+        std::string normalized_path = normalize_path_for_type(path, 
uri.scheme, type);
+        if (type == doris::TFileType::FILE_LOCAL) {
+            doris::io::FileSystemSPtr fs = 
doris::io::global_local_filesystem();
+            return std::make_pair(std::move(fs), normalized_path);
+        }
+        std::string fs_key = build_fs_cache_key(type, uri, default_fs_name_);
+        {
+            std::lock_guard lock(fs_lock_);
+            auto it = fs_cache_.find(fs_key);
+            if (it != fs_cache_.end()) {
+                return std::make_pair(it->second, normalized_path);
+            }
+        }
+        doris::io::FSPropertiesRef fs_properties(type);
+        const std::map<std::string, std::string>* properties = &options_;
+        std::map<std::string, std::string> properties_override;
+        if (type == doris::TFileType::FILE_HTTP && !options_.contains("uri") &&
+            !uri.scheme.empty()) {
+            properties_override = options_;
+            properties_override["uri"] = uri.scheme + "://" + uri.authority;
+            properties = &properties_override;
+        }
+        fs_properties.properties = properties;
+        if (!broker_addresses_.empty()) {
+            fs_properties.broker_addresses = &broker_addresses_;
+        }
+        doris::io::FileDescription file_description = {
+                .path = normalized_path, .file_size = -1, .mtime = 0, .fs_name 
= default_fs_name_};
+        auto fs_result = doris::FileFactory::create_fs(fs_properties, 
file_description);
+        if (!fs_result.has_value()) {
+            return to_paimon_status(fs_result.error());
+        }
+        doris::io::FileSystemSPtr fs = std::move(fs_result).value();
+        {
+            std::lock_guard lock(fs_lock_);
+            fs_cache_.emplace(std::move(fs_key), fs);
+        }
+        return std::make_pair(std::move(fs), std::move(normalized_path));
+    }
+
+    std::map<std::string, std::string> options_;
+    std::vector<doris::TNetworkAddress> broker_addresses_;
+    std::string default_fs_name_;
+    mutable std::mutex fs_lock_;
+    mutable std::unordered_map<std::string, doris::io::FileSystemSPtr> 
fs_cache_;
+};
+
+class DorisFileSystemFactory : public FileSystemFactory {
+public:
+    static const char IDENTIFIER[];
+
+    const char* Identifier() const override { return IDENTIFIER; }
+
+    Result<std::unique_ptr<FileSystem>> Create(
+            const std::string& path,
+            const std::map<std::string, std::string>& options) const override {
+        return std::make_unique<DorisFileSystem>(options);
+    }
+};
+
+const char DorisFileSystemFactory::IDENTIFIER[] = "doris";
+
+REGISTER_PAIMON_FACTORY(DorisFileSystemFactory);
+
+} // namespace paimon
+
+namespace doris::vectorized {
+
+void register_paimon_doris_file_system() {}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_doris_file_system.h 
b/be/src/vec/exec/format/table/paimon_doris_file_system.h
new file mode 100644
index 00000000000..22552c6eb6f
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_doris_file_system.h
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+namespace doris::vectorized {
+
+// Force-link helper so the paimon-cpp file system factory registration is 
kept.
+void register_paimon_doris_file_system();
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/paimon_predicate_converter.cpp 
b/be/src/vec/exec/format/table/paimon_predicate_converter.cpp
new file mode 100644
index 00000000000..6c8251ddb43
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_predicate_converter.cpp
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/paimon_predicate_converter.h"
+
+#include <algorithm>
+#include <cctype>
+#include <utility>
+
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "runtime/decimalv2_value.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "util/timezone_utils.h"
+#include "vec/columns/column_const.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/exprs/vcompound_pred.h"
+#include "vec/exprs/vdirect_in_predicate.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vin_predicate.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/runtime/timestamptz_value.h"
+#include "vec/runtime/vdatetime_value.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+PaimonPredicateConverter::PaimonPredicateConverter(
+        const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* 
state)
+        : _state(state) {
+    _field_index_by_name.reserve(file_slot_descs.size());
+    for (size_t i = 0; i < file_slot_descs.size(); ++i) {
+        const auto& name = file_slot_descs[i]->col_name();
+        auto normalized = _normalize_name(name);
+        if (_field_index_by_name.find(normalized) == 
_field_index_by_name.end()) {
+            _field_index_by_name.emplace(std::move(normalized), 
static_cast<int32_t>(i));
+        }
+    }
+
+    if (!TimezoneUtils::find_cctz_time_zone("GMT", _gmt_tz)) {
+        TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, 
_gmt_tz);
+    }
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::build(
+        const VExprContextSPtrs& conjuncts) {
+    std::vector<std::shared_ptr<paimon::Predicate>> predicates;
+    predicates.reserve(conjuncts.size());
+    for (const auto& conjunct : conjuncts) {
+        if (!conjunct || !conjunct->root()) {
+            continue;
+        }
+        auto root = conjunct->root();
+        if (root->is_rf_wrapper()) {
+            if (auto impl = root->get_impl()) {
+                root = impl;
+            }
+        }
+        auto predicate = _convert_expr(root);
+        if (predicate) {
+            predicates.emplace_back(std::move(predicate));
+        }
+    }
+
+    if (predicates.empty()) {
+        return nullptr;
+    }
+    if (predicates.size() == 1) {
+        return predicates.front();
+    }
+    auto and_result = paimon::PredicateBuilder::And(predicates);
+    if (!and_result.ok()) {
+        return nullptr;
+    }
+    return std::move(and_result).value();
+}
+
+std::shared_ptr<paimon::Predicate> 
PaimonPredicateConverter::_convert_expr(const VExprSPtr& expr) {
+    if (!expr) {
+        return nullptr;
+    }
+
+    auto uncast = VExpr::expr_without_cast(expr);
+
+    if (auto* direct_in = dynamic_cast<VDirectInPredicate*>(uncast.get())) {
+        VExprSPtr in_expr;
+        if (direct_in->get_slot_in_expr(in_expr)) {
+            return _convert_in(in_expr);
+        }
+        return nullptr;
+    }
+
+    if (dynamic_cast<VInPredicate*>(uncast.get()) != nullptr) {
+        return _convert_in(uncast);
+    }
+
+    switch (uncast->op()) {
+    case TExprOpcode::COMPOUND_AND:
+    case TExprOpcode::COMPOUND_OR:
+        return _convert_compound(uncast);
+    case TExprOpcode::COMPOUND_NOT:
+        return nullptr;
+    case TExprOpcode::EQ:
+    case TExprOpcode::EQ_FOR_NULL:
+    case TExprOpcode::NE:
+    case TExprOpcode::GE:
+    case TExprOpcode::GT:
+    case TExprOpcode::LE:
+    case TExprOpcode::LT:
+        return _convert_binary(uncast);
+    default:
+        break;
+    }
+
+    if (auto* fn = dynamic_cast<VectorizedFnCall*>(uncast.get())) {
+        auto fn_name = _normalize_name(fn->function_name());
+        if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+            return _convert_is_null(uncast, fn_name);
+        }
+        if (fn_name == "like") {
+            return _convert_like_prefix(uncast);
+        }
+    }
+
+    return nullptr;
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::_convert_compound(
+        const VExprSPtr& expr) {
+    if (!expr || expr->get_num_children() != 2) {
+        return nullptr;
+    }
+    auto left = _convert_expr(expr->get_child(0));
+    if (!left) {
+        return nullptr;
+    }
+    auto right = _convert_expr(expr->get_child(1));
+    if (!right) {
+        return nullptr;
+    }
+
+    if (expr->op() == TExprOpcode::COMPOUND_AND) {
+        auto and_result = paimon::PredicateBuilder::And({left, right});
+        return and_result.ok() ? std::move(and_result).value() : nullptr;
+    }
+    if (expr->op() == TExprOpcode::COMPOUND_OR) {
+        auto or_result = paimon::PredicateBuilder::Or({left, right});
+        return or_result.ok() ? std::move(or_result).value() : nullptr;
+    }
+    return nullptr;
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::_convert_in(const 
VExprSPtr& expr) {
+    auto* in_pred = dynamic_cast<VInPredicate*>(expr.get());
+    if (!in_pred || expr->get_num_children() < 2) {
+        return nullptr;
+    }
+    auto field_meta = _resolve_field(expr->get_child(0));
+    if (!field_meta) {
+        return nullptr;
+    }
+
+    std::vector<paimon::Literal> literals;
+    literals.reserve(expr->get_num_children() - 1);
+    for (uint16_t i = 1; i < expr->get_num_children(); ++i) {
+        auto literal = _convert_literal(expr->get_child(i), 
*field_meta->slot_desc,
+                                        field_meta->field_type);
+        if (!literal) {
+            return nullptr;
+        }
+        literals.emplace_back(std::move(*literal));
+    }
+
+    if (literals.empty()) {
+        return nullptr;
+    }
+    if (in_pred->is_not_in()) {
+        return paimon::PredicateBuilder::NotIn(field_meta->index, 
field_meta->slot_desc->col_name(),
+                                               field_meta->field_type, 
literals);
+    }
+    return paimon::PredicateBuilder::In(field_meta->index, 
field_meta->slot_desc->col_name(),
+                                        field_meta->field_type, literals);
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::_convert_binary(
+        const VExprSPtr& expr) {
+    if (!expr || expr->get_num_children() != 2) {
+        return nullptr;
+    }
+    auto field_meta = _resolve_field(expr->get_child(0));
+    if (!field_meta) {
+        return nullptr;
+    }
+
+    if (expr->op() == TExprOpcode::EQ_FOR_NULL) {
+        return paimon::PredicateBuilder::IsNull(
+                field_meta->index, field_meta->slot_desc->col_name(), 
field_meta->field_type);
+    }
+
+    auto literal =
+            _convert_literal(expr->get_child(1), *field_meta->slot_desc, 
field_meta->field_type);
+    if (!literal) {
+        return nullptr;
+    }
+
+    switch (expr->op()) {
+    case TExprOpcode::EQ:
+        return paimon::PredicateBuilder::Equal(field_meta->index, 
field_meta->slot_desc->col_name(),
+                                               field_meta->field_type, 
*literal);
+    case TExprOpcode::NE:
+        return paimon::PredicateBuilder::NotEqual(field_meta->index,
+                                                  
field_meta->slot_desc->col_name(),
+                                                  field_meta->field_type, 
*literal);
+    case TExprOpcode::GE:
+        return paimon::PredicateBuilder::GreaterOrEqual(field_meta->index,
+                                                        
field_meta->slot_desc->col_name(),
+                                                        
field_meta->field_type, *literal);
+    case TExprOpcode::GT:
+        return paimon::PredicateBuilder::GreaterThan(field_meta->index,
+                                                     
field_meta->slot_desc->col_name(),
+                                                     field_meta->field_type, 
*literal);
+    case TExprOpcode::LE:
+        return paimon::PredicateBuilder::LessOrEqual(field_meta->index,
+                                                     
field_meta->slot_desc->col_name(),
+                                                     field_meta->field_type, 
*literal);
+    case TExprOpcode::LT:
+        return paimon::PredicateBuilder::LessThan(field_meta->index,
+                                                  
field_meta->slot_desc->col_name(),
+                                                  field_meta->field_type, 
*literal);
+    default:
+        break;
+    }
+    return nullptr;
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::_convert_is_null(
+        const VExprSPtr& expr, const std::string& fn_name) {
+    if (!expr || expr->get_num_children() != 1) {
+        return nullptr;
+    }
+    auto field_meta = _resolve_field(expr->get_child(0));
+    if (!field_meta) {
+        return nullptr;
+    }
+    if (fn_name == "is_not_null_pred") {
+        return paimon::PredicateBuilder::IsNotNull(
+                field_meta->index, field_meta->slot_desc->col_name(), 
field_meta->field_type);
+    }
+    return paimon::PredicateBuilder::IsNull(field_meta->index, 
field_meta->slot_desc->col_name(),
+                                            field_meta->field_type);
+}
+
+std::shared_ptr<paimon::Predicate> 
PaimonPredicateConverter::_convert_like_prefix(
+        const VExprSPtr& expr) {
+    if (!expr || expr->get_num_children() != 2) {
+        return nullptr;
+    }
+    auto field_meta = _resolve_field(expr->get_child(0));
+    if (!field_meta || field_meta->field_type != paimon::FieldType::STRING) {
+        return nullptr;
+    }
+
+    auto pattern_opt = _extract_string_literal(expr->get_child(1));
+    if (!pattern_opt) {
+        return nullptr;
+    }
+    const std::string& pattern = *pattern_opt;
+    if (!pattern.empty() && pattern.front() == '%') {
+        return nullptr;
+    }
+    if (pattern.empty() || pattern.back() != '%') {
+        return nullptr;
+    }
+
+    std::string prefix = pattern.substr(0, pattern.size() - 1);
+    paimon::Literal lower_literal(paimon::FieldType::STRING, prefix.data(), 
prefix.size());
+    auto lower_pred = paimon::PredicateBuilder::GreaterOrEqual(
+            field_meta->index, field_meta->slot_desc->col_name(), 
field_meta->field_type,
+            lower_literal);
+
+    auto upper_prefix = _next_prefix(prefix);
+    if (!upper_prefix) {
+        return lower_pred;
+    }
+
+    paimon::Literal upper_literal(paimon::FieldType::STRING, 
upper_prefix->data(),
+                                  upper_prefix->size());
+    auto upper_pred =
+            paimon::PredicateBuilder::LessThan(field_meta->index, 
field_meta->slot_desc->col_name(),
+                                               field_meta->field_type, 
upper_literal);
+    auto and_result = paimon::PredicateBuilder::And({lower_pred, upper_pred});
+    return and_result.ok() ? std::move(and_result).value() : nullptr;
+}
+
+std::optional<PaimonPredicateConverter::FieldMeta> 
PaimonPredicateConverter::_resolve_field(
+        const VExprSPtr& expr) const {
+    if (!_state || !expr) {
+        return std::nullopt;
+    }
+    auto slot_expr = VExpr::expr_without_cast(expr);
+    auto* slot_ref = dynamic_cast<VSlotRef*>(slot_expr.get());
+    if (!slot_ref) {
+        return std::nullopt;
+    }
+    auto* slot_desc = 
_state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
+    if (!slot_desc) {
+        return std::nullopt;
+    }
+    auto normalized = _normalize_name(slot_desc->col_name());
+    auto it = _field_index_by_name.find(normalized);
+    if (it == _field_index_by_name.end()) {
+        return std::nullopt;
+    }
+    auto slot_type = slot_desc->type();
+    auto field_type =
+            _to_paimon_field_type(slot_type->get_primitive_type(), 
slot_type->get_precision());
+    if (!field_type) {
+        return std::nullopt;
+    }
+    return FieldMeta {it->second, *field_type, slot_desc};
+}
+
+std::optional<paimon::Literal> PaimonPredicateConverter::_convert_literal(
+        const VExprSPtr& expr, const SlotDescriptor& slot_desc,
+        paimon::FieldType field_type) const {
+    auto literal_expr = VExpr::expr_without_cast(expr);
+    auto* literal = dynamic_cast<VLiteral*>(literal_expr.get());
+    if (!literal) {
+        return std::nullopt;
+    }
+
+    auto literal_type = remove_nullable(literal->get_data_type());
+    PrimitiveType literal_primitive = literal_type->get_primitive_type();
+    PrimitiveType slot_primitive = slot_desc.type()->get_primitive_type();
+
+    ColumnPtr col = 
literal->get_column_ptr()->convert_to_full_column_if_const();
+    if (const auto* nullable = check_and_get_column<ColumnNullable>(*col)) {
+        if (nullable->is_null_at(0)) {
+            return std::nullopt;
+        }
+        col = nullable->get_nested_column_ptr();
+    }
+
+    Field field;
+    col->get(0, field);
+
+    switch (slot_primitive) {
+    case TYPE_BOOLEAN: {
+        if (literal_primitive != TYPE_BOOLEAN) {
+            return std::nullopt;
+        }
+        return paimon::Literal(static_cast<bool>(field.get<TYPE_BOOLEAN>()));
+    }
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT: {
+        if (!_is_integer_type(literal_primitive)) {
+            return std::nullopt;
+        }
+        int64_t value = 0;
+        switch (literal_primitive) {
+        case TYPE_TINYINT:
+            value = field.get<TYPE_TINYINT>();
+            break;
+        case TYPE_SMALLINT:
+            value = field.get<TYPE_SMALLINT>();
+            break;
+        case TYPE_INT:
+            value = field.get<TYPE_INT>();
+            break;
+        case TYPE_BIGINT:
+            value = field.get<TYPE_BIGINT>();
+            break;
+        default:
+            return std::nullopt;
+        }
+        if (slot_primitive == TYPE_TINYINT) {
+            return paimon::Literal(static_cast<int8_t>(value));
+        }
+        if (slot_primitive == TYPE_SMALLINT) {
+            return paimon::Literal(static_cast<int16_t>(value));
+        }
+        if (slot_primitive == TYPE_INT) {
+            return paimon::Literal(static_cast<int32_t>(value));
+        }
+        return paimon::Literal(static_cast<int64_t>(value));
+    }
+    case TYPE_DOUBLE: {
+        if (literal_primitive != TYPE_DOUBLE && literal_primitive != 
TYPE_FLOAT) {
+            return std::nullopt;
+        }
+        double value = 0;
+        if (literal_primitive == TYPE_FLOAT) {
+            value = static_cast<double>(field.get<TYPE_FLOAT>());
+        } else {
+            value = field.get<TYPE_DOUBLE>();
+        }
+        return paimon::Literal(value);
+    }
+    case TYPE_DATE:
+    case TYPE_DATEV2: {
+        if (!_is_date_type(literal_primitive)) {
+            return std::nullopt;
+        }
+        int64_t seconds = 0;
+        if (literal_primitive == TYPE_DATE) {
+            const auto& dt = field.get<TYPE_DATE>();
+            if (!dt.is_valid_date()) {
+                return std::nullopt;
+            }
+            dt.unix_timestamp(&seconds, _gmt_tz);
+        } else if (literal_primitive == TYPE_DATEV2) {
+            const auto& dt = field.get<TYPE_DATEV2>();
+            if (!dt.is_valid_date()) {
+                return std::nullopt;
+            }
+            dt.unix_timestamp(&seconds, _gmt_tz);
+        }
+        int32_t days = _seconds_to_days(seconds);
+        return paimon::Literal(paimon::FieldType::DATE, days);
+    }
+    case TYPE_DATETIME:
+    case TYPE_DATETIMEV2: {
+        if (!_is_datetime_type(literal_primitive)) {
+            return std::nullopt;
+        }
+        if (literal_primitive == TYPE_DATETIME) {
+            const auto& dt = field.get<TYPE_DATETIME>();
+            if (!dt.is_valid_date()) {
+                return std::nullopt;
+            }
+            int64_t seconds = 0;
+            dt.unix_timestamp(&seconds, _gmt_tz);
+            return paimon::Literal(paimon::Timestamp::FromEpochMillis(seconds 
* 1000));
+        }
+        std::pair<int64_t, int64_t> ts;
+        const auto& dt = field.get<TYPE_DATETIMEV2>();
+        if (!dt.is_valid_date()) {
+            return std::nullopt;
+        }
+        dt.unix_timestamp(&ts, _gmt_tz);
+        int64_t millis = ts.first * 1000 + ts.second / 1000;
+        return paimon::Literal(paimon::Timestamp::FromEpochMillis(millis));
+    }
+    case TYPE_VARCHAR:
+    case TYPE_STRING: {
+        if (!_is_string_type(literal_primitive)) {
+            return std::nullopt;
+        }
+        const auto& value = field.get<TYPE_STRING>();
+        return paimon::Literal(field_type, value.data(), value.size());
+    }
+    case TYPE_DECIMALV2:
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I:
+    case TYPE_DECIMAL256: {
+        if (!_is_decimal_type(literal_primitive)) {
+            return std::nullopt;
+        }
+        int32_t precision = 
static_cast<int32_t>(literal_type->get_precision());
+        int32_t scale = static_cast<int32_t>(literal_type->get_scale());
+        if (precision <= 0 || precision > paimon::Decimal::MAX_PRECISION) {
+            return std::nullopt;
+        }
+
+        paimon::Decimal::int128_t value = 0;
+        switch (literal_primitive) {
+        case TYPE_DECIMALV2: {
+            const auto& dec = field.get<TYPE_DECIMALV2>();
+            value = dec.value();
+            break;
+        }
+        case TYPE_DECIMAL32: {
+            const auto& dec = field.get<TYPE_DECIMAL32>();
+            value = dec.value;
+            break;
+        }
+        case TYPE_DECIMAL64: {
+            const auto& dec = field.get<TYPE_DECIMAL64>();
+            value = dec.value;
+            break;
+        }
+        case TYPE_DECIMAL128I: {
+            const auto& dec = field.get<TYPE_DECIMAL128I>();
+            value = dec.value;
+            break;
+        }
+        default:
+            return std::nullopt;
+        }
+        return paimon::Literal(paimon::Decimal(precision, scale, value));
+    }
+    default:
+        break;
+    }
+    return std::nullopt;
+}
+
+std::optional<std::string> PaimonPredicateConverter::_extract_string_literal(
+        const VExprSPtr& expr) const {
+    auto literal_expr = VExpr::expr_without_cast(expr);
+    auto* literal = dynamic_cast<VLiteral*>(literal_expr.get());
+    if (!literal) {
+        return std::nullopt;
+    }
+    auto literal_type = remove_nullable(literal->get_data_type());
+    PrimitiveType literal_primitive = literal_type->get_primitive_type();
+    if (!_is_string_type(literal_primitive)) {
+        return std::nullopt;
+    }
+
+    ColumnPtr col = 
literal->get_column_ptr()->convert_to_full_column_if_const();
+    if (const auto* nullable = check_and_get_column<ColumnNullable>(*col)) {
+        if (nullable->is_null_at(0)) {
+            return std::nullopt;
+        }
+        col = nullable->get_nested_column_ptr();
+    }
+    Field field;
+    col->get(0, field);
+    const auto& value = field.get<TYPE_STRING>();
+    return value;
+}
+
+std::string PaimonPredicateConverter::_normalize_name(std::string_view name) {
+    std::string out(name);
+    std::transform(out.begin(), out.end(), out.begin(),
+                   [](unsigned char c) { return 
static_cast<char>(std::tolower(c)); });
+    return out;
+}
+
+std::optional<std::string> PaimonPredicateConverter::_next_prefix(const 
std::string& prefix) {
+    if (prefix.empty()) {
+        return std::nullopt;
+    }
+    std::string upper = prefix;
+    for (int i = static_cast<int>(upper.size()) - 1; i >= 0; --i) {
+        auto c = static_cast<unsigned char>(upper[i]);
+        if (c != 0xFF) {
+            upper[i] = static_cast<char>(c + 1);
+            upper.resize(i + 1);
+            return upper;
+        }
+    }
+    return std::nullopt;
+}
+
+int32_t PaimonPredicateConverter::_seconds_to_days(int64_t seconds) {
+    static constexpr int64_t kSecondsPerDay = 24 * 60 * 60;
+    int64_t days = seconds / kSecondsPerDay;
+    if (seconds < 0 && seconds % kSecondsPerDay != 0) {
+        --days;
+    }
+    return static_cast<int32_t>(days);
+}
+
+bool PaimonPredicateConverter::_is_integer_type(PrimitiveType type) {
+    switch (type) {
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT:
+        return true;
+    default:
+        return false;
+    }
+}
+
+bool PaimonPredicateConverter::_is_string_type(PrimitiveType type) {
+    return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING;
+}
+
+bool PaimonPredicateConverter::_is_decimal_type(PrimitiveType type) {
+    switch (type) {
+    case TYPE_DECIMALV2:
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I:
+    case TYPE_DECIMAL256:
+        return true;
+    default:
+        return false;
+    }
+}
+
+bool PaimonPredicateConverter::_is_date_type(PrimitiveType type) {
+    return type == TYPE_DATE || type == TYPE_DATEV2;
+}
+
+bool PaimonPredicateConverter::_is_datetime_type(PrimitiveType type) {
+    return type == TYPE_DATETIME || type == TYPE_DATETIMEV2;
+}
+
+std::optional<paimon::FieldType> 
PaimonPredicateConverter::_to_paimon_field_type(
+        PrimitiveType type, uint32_t precision) {
+    switch (type) {
+    case TYPE_BOOLEAN:
+        return paimon::FieldType::BOOLEAN;
+    case TYPE_TINYINT:
+        return paimon::FieldType::TINYINT;
+    case TYPE_SMALLINT:
+        return paimon::FieldType::SMALLINT;
+    case TYPE_INT:
+        return paimon::FieldType::INT;
+    case TYPE_BIGINT:
+        return paimon::FieldType::BIGINT;
+    case TYPE_DOUBLE:
+        return paimon::FieldType::DOUBLE;
+    case TYPE_VARCHAR:
+    case TYPE_STRING:
+        return paimon::FieldType::STRING;
+    case TYPE_DATE:
+    case TYPE_DATEV2:
+        return paimon::FieldType::DATE;
+    case TYPE_DATETIME:
+    case TYPE_DATETIMEV2:
+        return paimon::FieldType::TIMESTAMP;
+    case TYPE_DECIMALV2:
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I:
+    case TYPE_DECIMAL256:
+        if (precision > 0 && precision > paimon::Decimal::MAX_PRECISION) {
+            return std::nullopt;
+        }
+        return paimon::FieldType::DECIMAL;
+    case TYPE_FLOAT:
+    case TYPE_CHAR:
+    default:
+        return std::nullopt;
+    }
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_predicate_converter.h 
b/be/src/vec/exec/format/table/paimon_predicate_converter.h
new file mode 100644
index 00000000000..a844b497d52
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_predicate_converter.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "cctz/time_zone.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/literal.h"
+#include "runtime/define_primitive_type.h"
+#include "vec/exprs/vexpr_fwd.h"
+
+namespace paimon {
+class Predicate;
+} // namespace paimon
+
+namespace doris {
+class RuntimeState;
+class SlotDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+class PaimonPredicateConverter {
+public:
+    PaimonPredicateConverter(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                             RuntimeState* state);
+
+    std::shared_ptr<paimon::Predicate> build(const VExprContextSPtrs& 
conjuncts);
+
+private:
+    struct FieldMeta {
+        int32_t index = -1;
+        paimon::FieldType field_type = paimon::FieldType::UNKNOWN;
+        const SlotDescriptor* slot_desc = nullptr;
+    };
+
+    std::shared_ptr<paimon::Predicate> _convert_expr(const VExprSPtr& expr);
+    std::shared_ptr<paimon::Predicate> _convert_compound(const VExprSPtr& 
expr);
+    std::shared_ptr<paimon::Predicate> _convert_in(const VExprSPtr& expr);
+    std::shared_ptr<paimon::Predicate> _convert_binary(const VExprSPtr& expr);
+    std::shared_ptr<paimon::Predicate> _convert_is_null(const VExprSPtr& expr,
+                                                        const std::string& 
fn_name);
+    std::shared_ptr<paimon::Predicate> _convert_like_prefix(const VExprSPtr& 
expr);
+
+    std::optional<FieldMeta> _resolve_field(const VExprSPtr& expr) const;
+    std::optional<paimon::Literal> _convert_literal(const VExprSPtr& expr,
+                                                    const SlotDescriptor& 
slot_desc,
+                                                    paimon::FieldType 
field_type) const;
+    std::optional<std::string> _extract_string_literal(const VExprSPtr& expr) 
const;
+
+    static std::string _normalize_name(std::string_view name);
+    static std::optional<std::string> _next_prefix(const std::string& prefix);
+    static int32_t _seconds_to_days(int64_t seconds);
+    static bool _is_integer_type(PrimitiveType type);
+    static bool _is_string_type(PrimitiveType type);
+    static bool _is_decimal_type(PrimitiveType type);
+    static bool _is_date_type(PrimitiveType type);
+    static bool _is_datetime_type(PrimitiveType type);
+    static std::optional<paimon::FieldType> 
_to_paimon_field_type(PrimitiveType type,
+                                                                  uint32_t 
precision);
+
+    std::unordered_map<std::string, int32_t> _field_index_by_name;
+    RuntimeState* _state = nullptr;
+    cctz::time_zone _gmt_tz;
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/file_scanner.cpp 
b/be/src/vec/exec/scan/file_scanner.cpp
index 17170df68f7..97c622cedc2 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -67,7 +67,9 @@
 #include "vec/exec/format/table/iceberg_reader.h"
 #include "vec/exec/format/table/lakesoul_jni_reader.h"
 #include "vec/exec/format/table/max_compute_jni_reader.h"
+#include "vec/exec/format/table/paimon_cpp_reader.h"
 #include "vec/exec/format/table/paimon_jni_reader.h"
+#include "vec/exec/format/table/paimon_predicate_converter.h"
 #include "vec/exec/format/table/paimon_reader.h"
 #include "vec/exec/format/table/remote_doris_reader.h"
 #include "vec/exec/format/table/transactional_hive_reader.h"
@@ -989,9 +991,25 @@ Status FileScanner::_get_next_reader() {
                 _cur_reader = std::move(mc_reader);
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == 
"paimon") {
-                _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, 
_state, _profile,
-                                                             range, _params);
-                init_status = 
((PaimonJniReader*)(_cur_reader.get()))->init_reader();
+                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
+                    _state->query_options().enable_paimon_cpp_reader) {
+                    auto cpp_reader = 
PaimonCppReader::create_unique(_file_slot_descs, _state,
+                                                                     _profile, 
range, _params);
+                    
cpp_reader->set_push_down_agg_type(_get_push_down_agg_type());
+                    if (!_is_load && !_push_down_conjuncts.empty()) {
+                        PaimonPredicateConverter 
predicate_converter(_file_slot_descs, _state);
+                        auto predicate = 
predicate_converter.build(_push_down_conjuncts);
+                        if (predicate) {
+                            cpp_reader->set_predicate(std::move(predicate));
+                        }
+                    }
+                    init_status = cpp_reader->init_reader();
+                    _cur_reader = std::move(cpp_reader);
+                } else {
+                    _cur_reader = 
PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
+                                                                 range, 
_params);
+                    init_status = 
((PaimonJniReader*)(_cur_reader.get()))->init_reader();
+                }
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == "hudi") {
                 _cur_reader = HudiJniReader::create_unique(*_params,
@@ -1012,8 +1030,9 @@ Status FileScanner::_get_next_reader() {
             }
             // Set col_name_to_block_idx for JNI readers to avoid repeated map 
creation
             if (_cur_reader) {
-                static_cast<JniReader*>(_cur_reader.get())
-                        ->set_col_name_to_block_idx(&_src_block_name_to_idx);
+                if (auto* jni_reader = 
dynamic_cast<JniReader*>(_cur_reader.get())) {
+                    
jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
+                }
             }
             break;
         }
diff --git a/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp 
b/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp
new file mode 100644
index 00000000000..95fb22d9b65
--- /dev/null
+++ b/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/paimon_cpp_reader.h"
+
+#include <gtest/gtest.h>
+
+#include <string>
+#include <vector>
+
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+
+class PaimonCppReaderTest : public testing::Test {
+protected:
+    void SetUp() override {
+        _query_options.__set_batch_size(3);
+        _runtime_state = std::make_unique<RuntimeState>(_query_options, 
_query_globals);
+    }
+
+    TFileRangeDesc _build_range_with_table_level_row_count(int64_t row_count) {
+        TFileRangeDesc range;
+        range.__isset.table_format_params = true;
+        range.table_format_params.__isset.table_level_row_count = true;
+        range.table_format_params.table_level_row_count = row_count;
+        return range;
+    }
+
+    TQueryOptions _query_options;
+    TQueryGlobals _query_globals;
+    std::unique_ptr<RuntimeState> _runtime_state;
+    RuntimeProfile _profile {"paimon_cpp_reader_test"};
+    std::vector<SlotDescriptor*> _file_slot_descs;
+};
+
+TEST_F(PaimonCppReaderTest, CountPushDownUsesTableLevelRowCount) {
+    auto range = _build_range_with_table_level_row_count(5);
+    PaimonCppReader reader(_file_slot_descs, _runtime_state.get(), &_profile, 
range, nullptr);
+    reader.set_push_down_agg_type(TPushAggOp::type::COUNT);
+
+    auto init_status = reader.init_reader();
+    ASSERT_TRUE(init_status.ok()) << init_status;
+
+    Block block;
+    size_t read_rows = 0;
+    bool eof = false;
+
+    auto first_status = reader.get_next_block(&block, &read_rows, &eof);
+    ASSERT_TRUE(first_status.ok()) << first_status;
+    EXPECT_EQ(3, read_rows);
+    EXPECT_FALSE(eof);
+
+    auto second_status = reader.get_next_block(&block, &read_rows, &eof);
+    ASSERT_TRUE(second_status.ok()) << second_status;
+    EXPECT_EQ(2, read_rows);
+    EXPECT_TRUE(eof);
+
+    auto third_status = reader.get_next_block(&block, &read_rows, &eof);
+    ASSERT_TRUE(third_status.ok()) << third_status;
+    EXPECT_EQ(0, read_rows);
+    EXPECT_TRUE(eof);
+}
+
+TEST_F(PaimonCppReaderTest, InitReaderFailsWithoutPaimonSplit) {
+    TFileRangeDesc range;
+    range.__isset.table_format_params = true;
+    range.table_format_params.__isset.paimon_params = true;
+    range.table_format_params.paimon_params.__isset.paimon_table = true;
+    range.table_format_params.paimon_params.paimon_table = 
"s3://bucket/db.tbl";
+
+    PaimonCppReader reader(_file_slot_descs, _runtime_state.get(), &_profile, 
range, nullptr);
+    auto status = reader.init_reader();
+
+    ASSERT_FALSE(status.ok());
+    EXPECT_NE(status.to_string().find("missing paimon_split"), 
std::string::npos);
+}
+
+} // namespace doris::vectorized
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java
index 44ffb298c98..d1d6c2f9bdb 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java
@@ -26,7 +26,8 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 public class PaimonUtils {
-    private static final Base64.Decoder DECODER = Base64.getUrlDecoder();
+    private static final Base64.Decoder URL_DECODER = Base64.getUrlDecoder();
+    private static final Base64.Decoder STD_DECODER = Base64.getDecoder();
 
     public static List<String> getFieldNames(RowType rowType) {
         return rowType.getFields().stream()
@@ -37,9 +38,14 @@ public class PaimonUtils {
 
     public static <T> T deserialize(String encodedStr) {
         try {
-            return InstantiationUtil.deserializeObject(
-                    
DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8)),
-                    PaimonUtils.class.getClassLoader());
+            byte[] decoded;
+            try {
+                decoded = 
URL_DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+            } catch (IllegalArgumentException e) {
+                // Fallback to standard Base64 for splits encoded by native 
Paimon serialization.
+                decoded = 
STD_DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+            }
+            return InstantiationUtil.deserializeObject(decoded, 
PaimonUtils.class.getClassLoader());
         } catch (Throwable e) {
             throw new RuntimeException(e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index 4c343bd1266..d4c9ddf6c2f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -51,6 +51,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.predicate.Predicate;
@@ -60,6 +61,7 @@ import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.types.ArrayType;
@@ -78,6 +80,7 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Projection;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
+import java.io.ByteArrayOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.time.DateTimeException;
@@ -509,6 +512,23 @@ public class PaimonUtil {
         }
     }
 
+    /**
+     * Serialize DataSplit using Paimon's native binary format.
+     * This format is compatible with paimon-cpp reader.
+     * Uses standard Base64 encoding (not URL-safe) for BE compatibility.
+     */
+    public static String encodeDataSplitToString(DataSplit split) {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+            split.serialize(out);
+            byte[] bytes = baos.toByteArray();
+            return Base64.getEncoder().encodeToString(bytes);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize DataSplit using 
Paimon native format", e);
+        }
+    }
+
     public static Map<String, String> getPartitionInfoMap(Table table, 
BinaryRow partitionValues, String timeZone) {
         Map<String, String> partitionInfoMap = new HashMap<>();
         List<String> partitionKeys = table.partitionKeys();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index f3f71fcfcc9..6323972dd16 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -228,9 +228,19 @@ public class PaimonScanNode extends FileQueryScanNode {
 
         String fileFormat = getFileFormat(paimonSplit.getPathString());
         if (split != null) {
-            // use jni reader
+            // use jni reader or paimon-cpp reader
             rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
-            fileDesc.setPaimonSplit(PaimonUtil.encodeObjectToString(split));
+            // Use Paimon native serialization for paimon-cpp reader
+            if (sessionVariable.isEnablePaimonCppReader() && split instanceof 
DataSplit) {
+                
fileDesc.setPaimonSplit(PaimonUtil.encodeDataSplitToString((DataSplit) split));
+            } else {
+                
fileDesc.setPaimonSplit(PaimonUtil.encodeObjectToString(split));
+            }
+            // Set table location for paimon-cpp reader
+            String tableLocation = source.getTableLocation();
+            if (tableLocation != null) {
+                fileDesc.setPaimonTable(tableLocation);
+            }
             rangeDesc.setSelfSplitWeight(paimonSplit.getSelfSplitWeight());
         } else {
             // use native reader
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index 2dc87664f69..43c6ef41701 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -29,6 +29,7 @@ import 
org.apache.doris.datasource.paimon.PaimonSysExternalTable;
 import org.apache.doris.thrift.TFileAttributes;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
 import java.util.Optional;
@@ -90,4 +91,12 @@ public class PaimonSource {
     public String getFileFormatFromTableProperties() {
         return originTable.options().getOrDefault("file.format", "parquet");
     }
+
+    public String getTableLocation() {
+        if (originTable instanceof FileStoreTable) {
+            return ((FileStoreTable) originTable).location().toString();
+        }
+        // Fallback to path option
+        return originTable.options().get("path");
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7aa82a3e0ee..3fc7d3a7634 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -740,6 +740,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
 
+    public static final String ENABLE_PAIMON_CPP_READER = 
"enable_paimon_cpp_reader";
+
     public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = 
"enable_count_push_down_for_external_table";
 
     public static final String FETCH_ALL_FE_FOR_SYSTEM_TABLE = 
"fetch_all_fe_for_system_table";
@@ -1314,13 +1316,14 @@ public class SessionVariable implements Serializable, 
Writable {
     public enum IgnoreSplitType {
         NONE,
         IGNORE_JNI,
-        IGNORE_NATIVE
+        IGNORE_NATIVE,
+        IGNORE_PAIMON_CPP
     }
 
     public static final String IGNORE_SPLIT_TYPE = "ignore_split_type";
     @VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE,
             checker = "checkIgnoreSplitType",
-            options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"},
+            options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE", 
"IGNORE_PAIMON_CPP"},
             description = {"忽略指定类型的 split", "Ignore splits of the specified 
type"})
     public String ignoreSplitType = IgnoreSplitType.NONE.toString();
 
@@ -2766,6 +2769,11 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"强制使用 jni 方式读取外表", "Force the use of jni mode to 
read external table"})
     private boolean forceJniScanner = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_PAIMON_CPP_READER,
+            fuzzy = true,
+            description = {"Paimon 非原生文件读取使用 paimon-cpp", "Use paimon-cpp for 
non-native Paimon reads"})
+    private boolean enablePaimonCppReader = false;
+
     @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
             fuzzy = true,
             description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown 
optimization for external table"})
@@ -3620,6 +3628,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
         // jni
         this.forceJniScanner = random.nextBoolean();
+        this.enablePaimonCppReader = random.nextBoolean();
 
         // statistics
         this.fetchHiveRowCountSync = random.nextBoolean();
@@ -5183,6 +5192,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setEnableParquetFilePageCache(enableParquetFilePageCache);
         tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax);
+        tResult.setEnablePaimonCppReader(enablePaimonCppReader);
         tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess);
 
         tResult.setTruncateCharOrVarcharColumns(truncateCharOrVarcharColumns);
@@ -5923,6 +5933,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return forceJniScanner;
     }
 
+    public boolean isEnablePaimonCppReader() {
+        return enablePaimonCppReader;
+    }
+
     public String getIgnoreSplitType() {
         return ignoreSplitType;
     }
@@ -5931,7 +5945,8 @@ public class SessionVariable implements Serializable, 
Writable {
         try {
             IgnoreSplitType.valueOf(value);
         } catch (Exception e) {
-            throw new UnsupportedOperationException("We only support `NONE`, 
`IGNORE_JNI` and `IGNORE_NATIVE`");
+            throw new UnsupportedOperationException(
+                    "We only support `NONE`, `IGNORE_JNI`, `IGNORE_NATIVE` and 
`IGNORE_PAIMON_CPP`");
         }
     }
 
@@ -5943,6 +5958,10 @@ public class SessionVariable implements Serializable, 
Writable {
         forceJniScanner = force;
     }
 
+    public void setEnablePaimonCppReader(boolean enable) {
+        enablePaimonCppReader = enable;
+    }
+
     public boolean isEnableCountPushDownForExternalTable() {
         return enableCountPushDownForExternalTable;
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index a1b9e237c6c..e495d529ef6 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -437,6 +437,8 @@ struct TQueryOptions {
   195: optional bool enable_left_semi_direct_return_opt;
 
   200: optional bool enable_adjust_conjunct_order_by_cost;
+  // Use paimon-cpp to read Paimon splits on BE
+  201: optional bool enable_paimon_cpp_reader = false;
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to