This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 798d64411e7 [fix](wal) rename wal_reader and move to wal directory 
(#61135)
798d64411e7 is described below

commit 798d64411e7d6e121e1cca136e9b62b738fef78a
Author: meiyi <[email protected]>
AuthorDate: Mon Mar 9 11:33:04 2026 +0800

    [fix](wal) rename wal_reader and move to wal directory (#61135)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/exec/scan/file_scanner.cpp                  |   2 +-
 be/src/format/wal/wal_reader.cpp                   | 124 ------------
 be/src/format/wal/wal_reader.h                     |  56 ------
 .../wal/{wal_reader.cpp => wal_file_reader.cpp}    |  20 +-
 .../wal/{wal_reader.h => wal_file_reader.h}        |   6 +-
 be/src/load/group_commit/wal/wal_manager.cpp       |   2 +-
 be/src/load/group_commit/wal/wal_reader.cpp        | 210 +++++++++------------
 be/src/load/group_commit/wal/wal_reader.h          |  55 +++---
 be/src/load/group_commit/wal/wal_table.cpp         |   3 +-
 be/test/format/wal/wal_reader_writer_test.cpp      |   4 +-
 10 files changed, 139 insertions(+), 343 deletions(-)

diff --git a/be/src/exec/scan/file_scanner.cpp 
b/be/src/exec/scan/file_scanner.cpp
index c13185f55de..04902e58b6e 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -78,8 +78,8 @@
 #include "format/table/transactional_hive_reader.h"
 #include "format/table/trino_connector_jni_reader.h"
 #include "format/text/text_reader.h"
-#include "format/wal/wal_reader.h"
 #include "io/cache/block_file_cache_profile.h"
+#include "load/group_commit/wal/wal_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_profile.h"
 #include "runtime/runtime_state.h"
diff --git a/be/src/format/wal/wal_reader.cpp b/be/src/format/wal/wal_reader.cpp
deleted file mode 100644
index 78d5e855659..00000000000
--- a/be/src/format/wal/wal_reader.cpp
+++ /dev/null
@@ -1,124 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "format/wal/wal_reader.h"
-
-#include <absl/strings/str_split.h>
-
-#include "agent/be_exec_version_manager.h"
-#include "common/logging.h"
-#include "core/block/block.h"
-#include "cpp/sync_point.h"
-#include "load/group_commit/wal/wal_manager.h"
-#include "runtime/runtime_state.h"
-
-namespace doris::vectorized {
-#include "common/compile_check_begin.h"
-WalReader::WalReader(RuntimeState* state) : _state(state) {
-    _wal_id = state->wal_id();
-}
-
-Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
-    _tuple_descriptor = tuple_descriptor;
-    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, 
_wal_path));
-    _wal_reader = std::make_shared<doris::WalReader>(_wal_path);
-    RETURN_IF_ERROR(_wal_reader->init());
-    return Status::OK();
-}
-
-Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
-    //read src block
-    PBlock pblock;
-    auto st = _wal_reader->read_block(pblock);
-    if (st.is<ErrorCode::END_OF_FILE>()) {
-        LOG(INFO) << "read eof on wal:" << _wal_path;
-        *read_rows = 0;
-        *eof = true;
-        return Status::OK();
-    }
-    if (!st.ok()) {
-        LOG(WARNING) << "Failed to read wal on path = " << _wal_path;
-        return st;
-    }
-    int be_exec_version = pblock.has_be_exec_version() ? 
pblock.be_exec_version() : 0;
-    if (!BeExecVersionManager::check_be_exec_version(be_exec_version)) {
-        return Status::DataQualityError("check be exec version fail when 
reading wal file {}",
-                                        _wal_path);
-    }
-    Block src_block;
-    size_t uncompressed_size = 0;
-    int64_t uncompressed_time = 0;
-    RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size, 
&uncompressed_time));
-    //convert to dst block
-    Block dst_block;
-    int index = 0;
-    auto output_block_columns = block->get_columns_with_type_and_name();
-    size_t output_block_column_size = output_block_columns.size();
-    TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", 
&_column_id_count);
-    TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", 
&output_block_column_size);
-    if (_column_id_count != src_block.columns() ||
-        output_block_column_size != _tuple_descriptor->slots().size()) {
-        return Status::InternalError(
-                "not equal wal _column_id_count={} vs wal block columns 
size={}, "
-                "output block columns size={} vs tuple_descriptor size={}",
-                std::to_string(_column_id_count), 
std::to_string(src_block.columns()),
-                std::to_string(output_block_column_size),
-                std::to_string(_tuple_descriptor->slots().size()));
-    }
-    for (auto* slot_desc : _tuple_descriptor->slots()) {
-        auto pos = _column_pos_map[slot_desc->col_unique_id()];
-        if (pos >= src_block.columns()) {
-            return Status::InternalError("read wal {} fail, pos {}, columns 
size {}", _wal_path,
-                                         pos, src_block.columns());
-        }
-        vectorized::ColumnPtr column_ptr = 
src_block.get_by_position(pos).column;
-        if (!column_ptr && slot_desc->is_nullable()) {
-            column_ptr = make_nullable(column_ptr);
-        }
-        dst_block.insert(index, vectorized::ColumnWithTypeAndName(
-                                        std::move(column_ptr), 
output_block_columns[index].type,
-                                        output_block_columns[index].name));
-        index++;
-    }
-    block->swap(dst_block);
-    *read_rows = block->rows();
-    VLOG_DEBUG << "read block rows:" << *read_rows;
-    return Status::OK();
-}
-
-Status WalReader::get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
-                              std::unordered_set<std::string>* missing_cols) {
-    std::string col_ids;
-    RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids));
-    std::vector<std::string> column_id_vector =
-            absl::StrSplit(col_ids, ",", absl::SkipWhitespace());
-    _column_id_count = column_id_vector.size();
-    try {
-        int64_t pos = 0;
-        for (auto col_id_str : column_id_vector) {
-            auto col_id = std::strtoll(col_id_str.c_str(), nullptr, 10);
-            _column_pos_map.emplace(col_id, pos);
-            pos++;
-        }
-    } catch (const std::invalid_argument& e) {
-        return Status::InvalidArgument("Invalid format, {}", e.what());
-    }
-    return Status::OK();
-}
-
-#include "common/compile_check_end.h"
-} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/format/wal/wal_reader.h b/be/src/format/wal/wal_reader.h
deleted file mode 100644
index e2e9f02c248..00000000000
--- a/be/src/format/wal/wal_reader.h
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-#include "format/generic_reader.h"
-#include "load/group_commit/wal/wal_reader.h"
-#include "runtime/descriptors.h"
-
-namespace doris::vectorized {
-#include "common/compile_check_begin.h"
-struct ScannerCounter;
-class WalReader : public GenericReader {
-    ENABLE_FACTORY_CREATOR(WalReader);
-
-public:
-    WalReader(RuntimeState* state);
-    ~WalReader() override = default;
-    Status init_reader(const TupleDescriptor* tuple_descriptor);
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-    Status get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
-
-    Status close() override {
-        if (_wal_reader) {
-            return _wal_reader->finalize();
-        }
-        return Status::OK();
-    }
-
-private:
-    RuntimeState* _state = nullptr;
-    int64_t _wal_id;
-    std::string _wal_path;
-    std::shared_ptr<doris::WalReader> _wal_reader = nullptr;
-    const TupleDescriptor* _tuple_descriptor = nullptr;
-    // column_id, column_pos
-    std::map<int64_t, int64_t> _column_pos_map;
-    int64_t _column_id_count;
-    uint32_t _version = 0;
-};
-#include "common/compile_check_end.h"
-} // namespace doris::vectorized
diff --git a/be/src/load/group_commit/wal/wal_reader.cpp 
b/be/src/load/group_commit/wal/wal_file_reader.cpp
similarity index 90%
copy from be/src/load/group_commit/wal/wal_reader.cpp
copy to be/src/load/group_commit/wal/wal_file_reader.cpp
index 9329fee0199..cd9430a6c18 100644
--- a/be/src/load/group_commit/wal/wal_reader.cpp
+++ b/be/src/load/group_commit/wal/wal_file_reader.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "load/group_commit/wal/wal_reader.h"
+#include "load/group_commit/wal/wal_file_reader.h"
 
 #include <crc32c/crc32c.h>
 
@@ -32,11 +32,11 @@
 
 namespace doris {
 
-WalReader::WalReader(const std::string& file_name) : _file_name(file_name), 
_offset(0) {}
+WalFileReader::WalFileReader(const std::string& file_name) : 
_file_name(file_name), _offset(0) {}
 
-WalReader::~WalReader() = default;
-Status WalReader::_deserialize(PBlock& block, const std::string& buf, size_t 
block_len,
-                               size_t bytes_read) {
+WalFileReader::~WalFileReader() = default;
+Status WalFileReader::_deserialize(PBlock& block, const std::string& buf, 
size_t block_len,
+                                   size_t bytes_read) {
     if (UNLIKELY(!block.ParseFromString(buf))) {
         return Status::InternalError(
                 "failed to deserialize row, file_size=" + 
std::to_string(file_reader->size()) +
@@ -57,7 +57,7 @@ std::pair<int64_t, int64_t> parse_db_tb_from_wal_path(const 
std::string& wal_pat
     return {db_id, tb_id};
 }
 
-Status WalReader::init() {
+Status WalFileReader::init() {
     auto [db_id, tb_id] = parse_db_tb_from_wal_path(_file_name);
     io::FileSystemSPtr fs;
     RETURN_IF_ERROR(determine_wal_fs(db_id, tb_id, fs));
@@ -72,14 +72,14 @@ Status WalReader::init() {
     return Status::OK();
 }
 
-Status WalReader::finalize() {
+Status WalFileReader::finalize() {
     if (file_reader) {
         return file_reader->close();
     }
     return Status::OK();
 }
 
-Status WalReader::read_block(PBlock& block) {
+Status WalFileReader::read_block(PBlock& block) {
     if (_offset >= file_reader->size()) {
         return Status::EndOfFile("end of wal file");
     }
@@ -114,7 +114,7 @@ Status WalReader::read_block(PBlock& block) {
     return Status::OK();
 }
 
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
+Status WalFileReader::read_header(uint32_t& version, std::string& col_ids) {
     if (file_reader->size() == 0) {
         return Status::DataQualityError("empty file");
     }
@@ -145,7 +145,7 @@ Status WalReader::read_header(uint32_t& version, 
std::string& col_ids) {
     return Status::OK();
 }
 
-Status WalReader::_check_checksum(const char* binary, size_t size, uint32_t 
checksum) {
+Status WalFileReader::_check_checksum(const char* binary, size_t size, 
uint32_t checksum) {
     uint32_t computed_checksum = crc32c::Crc32c(binary, size);
     if (LIKELY(computed_checksum == checksum)) {
         return Status::OK();
diff --git a/be/src/load/group_commit/wal/wal_reader.h 
b/be/src/load/group_commit/wal/wal_file_reader.h
similarity index 91%
copy from be/src/load/group_commit/wal/wal_reader.h
copy to be/src/load/group_commit/wal/wal_file_reader.h
index 10a1f1e2229..574f46f2cf8 100644
--- a/be/src/load/group_commit/wal/wal_reader.h
+++ b/be/src/load/group_commit/wal/wal_file_reader.h
@@ -24,10 +24,10 @@
 
 namespace doris {
 
-class WalReader {
+class WalFileReader {
 public:
-    explicit WalReader(const std::string& file_name);
-    ~WalReader();
+    explicit WalFileReader(const std::string& file_name);
+    ~WalFileReader();
 
     Status init();
     Status finalize();
diff --git a/be/src/load/group_commit/wal/wal_manager.cpp 
b/be/src/load/group_commit/wal/wal_manager.cpp
index 962061a236b..06d009404f7 100644
--- a/be/src/load/group_commit/wal/wal_manager.cpp
+++ b/be/src/load/group_commit/wal/wal_manager.cpp
@@ -30,9 +30,9 @@
 
 #include "common/config.h"
 #include "common/status.h"
-#include "format/wal/wal_reader.h"
 #include "io/fs/local_file_system.h"
 #include "load/group_commit/wal/wal_dirs_info.h"
+#include "load/group_commit/wal/wal_reader.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "util/parse_util.h"
diff --git a/be/src/load/group_commit/wal/wal_reader.cpp 
b/be/src/load/group_commit/wal/wal_reader.cpp
index 9329fee0199..a2ff97f1c02 100644
--- a/be/src/load/group_commit/wal/wal_reader.cpp
+++ b/be/src/load/group_commit/wal/wal_reader.cpp
@@ -15,144 +15,110 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "load/group_commit/wal/wal_reader.h"
+#include "wal_reader.h"
 
-#include <crc32c/crc32c.h>
+#include <absl/strings/str_split.h>
 
-#include <string_view>
-#include <utility>
+#include "agent/be_exec_version_manager.h"
+#include "common/logging.h"
+#include "core/block/block.h"
+#include "cpp/sync_point.h"
+#include "load/group_commit/wal/wal_manager.h"
+#include "runtime/runtime_state.h"
 
-#include "common/status.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/file_system.h"
-#include "io/fs/path.h"
-#include "load/group_commit/wal/wal_writer.h"
-#include "util/coding.h"
-#include "util/string_util.h"
-
-namespace doris {
-
-WalReader::WalReader(const std::string& file_name) : _file_name(file_name), 
_offset(0) {}
-
-WalReader::~WalReader() = default;
-Status WalReader::_deserialize(PBlock& block, const std::string& buf, size_t 
block_len,
-                               size_t bytes_read) {
-    if (UNLIKELY(!block.ParseFromString(buf))) {
-        return Status::InternalError(
-                "failed to deserialize row, file_size=" + 
std::to_string(file_reader->size()) +
-                ", read_offset=" + std::to_string(_offset) + +", block_bytes=" 
+
-                std::to_string(block_len) + ", read_block_bytes=" + 
std::to_string(bytes_read));
-    }
-    return Status::OK();
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+WalReader::WalReader(RuntimeState* state) : _state(state) {
+    _wal_id = state->wal_id();
 }
 
-std::pair<int64_t, int64_t> parse_db_tb_from_wal_path(const std::string& 
wal_path) {
-    auto ret = split(wal_path, "/");
-    DCHECK_GT(ret.size(), 3);
-    auto db_id_pos = ret.size() - 1 - 2;
-    auto tb_id_pos = ret.size() - 1 - 1;
-    auto db_id = std::stoll(ret[db_id_pos]);
-    auto tb_id = std::stoll(ret[tb_id_pos]);
-
-    return {db_id, tb_id};
-}
-
-Status WalReader::init() {
-    auto [db_id, tb_id] = parse_db_tb_from_wal_path(_file_name);
-    io::FileSystemSPtr fs;
-    RETURN_IF_ERROR(determine_wal_fs(db_id, tb_id, fs));
-    bool exists = false;
-    RETURN_IF_ERROR(fs->exists(_file_name, &exists));
-    if (!exists) {
-        LOG(WARNING) << "not exist wal= " << _file_name;
-        return Status::NotFound("wal {} doesn't exist", _file_name);
-    }
-    RETURN_IF_ERROR(fs->open_file(_file_name, &file_reader));
-
+Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
+    _tuple_descriptor = tuple_descriptor;
+    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, 
_wal_path));
+    _wal_reader = std::make_shared<doris::WalFileReader>(_wal_path);
+    RETURN_IF_ERROR(_wal_reader->init());
     return Status::OK();
 }
 
-Status WalReader::finalize() {
-    if (file_reader) {
-        return file_reader->close();
-    }
-    return Status::OK();
-}
-
-Status WalReader::read_block(PBlock& block) {
-    if (_offset >= file_reader->size()) {
-        return Status::EndOfFile("end of wal file");
+Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    //read src block
+    PBlock pblock;
+    auto st = _wal_reader->read_block(pblock);
+    if (st.is<ErrorCode::END_OF_FILE>()) {
+        LOG(INFO) << "read eof on wal:" << _wal_path;
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
     }
-    size_t bytes_read = 0;
-    uint8_t row_len_buf[WalWriter::LENGTH_SIZE];
-    RETURN_IF_ERROR(
-            file_reader->read_at(_offset, {row_len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
-    _offset += WalWriter::LENGTH_SIZE;
-    size_t block_len = decode_fixed64_le(row_len_buf);
-    if (block_len == 0) {
-        return Status::DataQualityError("fail to read wal {} ,block is empty", 
_file_name);
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to read wal on path = " << _wal_path;
+        return st;
     }
-    if (_offset == file_reader->size()) {
-        LOG(WARNING) << "need read block with length=" << block_len << ", but 
offset=" << _offset
-                     << " reached end of WAL (path=" << _file_name
-                     << ", size=" << file_reader->size() << ")";
-        return Status::EndOfFile("end of wal file");
+    int be_exec_version = pblock.has_be_exec_version() ? 
pblock.be_exec_version() : 0;
+    if (!BeExecVersionManager::check_be_exec_version(be_exec_version)) {
+        return Status::DataQualityError("check be exec version fail when 
reading wal file {}",
+                                        _wal_path);
     }
-    // read block
-    std::string block_buf;
-    block_buf.resize(block_len);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), 
block_len}, &bytes_read));
-    RETURN_IF_ERROR(_deserialize(block, block_buf, block_len, bytes_read));
-    _offset += block_len;
-    // checksum
-    uint8_t checksum_len_buf[WalWriter::CHECKSUM_SIZE];
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf, 
WalWriter::CHECKSUM_SIZE},
-                                         &bytes_read));
-    _offset += WalWriter::CHECKSUM_SIZE;
-    uint32_t checksum = decode_fixed32_le(checksum_len_buf);
-    RETURN_IF_ERROR(_check_checksum(block_buf.data(), block_len, checksum));
-    return Status::OK();
-}
-
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
-    if (file_reader->size() == 0) {
-        return Status::DataQualityError("empty file");
-    }
-    size_t bytes_read = 0;
-    std::string magic_str;
-    magic_str.resize(k_wal_magic_length);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, magic_str, &bytes_read));
-    if (strcmp(magic_str.c_str(), k_wal_magic) != 0) {
-        return Status::Corruption("Bad wal file {}: magic number not match", 
_file_name);
+    Block src_block;
+    size_t uncompressed_size = 0;
+    int64_t uncompressed_time = 0;
+    RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size, 
&uncompressed_time));
+    //convert to dst block
+    Block dst_block;
+    int index = 0;
+    auto output_block_columns = block->get_columns_with_type_and_name();
+    size_t output_block_column_size = output_block_columns.size();
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", 
&_column_id_count);
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", 
&output_block_column_size);
+    if (_column_id_count != src_block.columns() ||
+        output_block_column_size != _tuple_descriptor->slots().size()) {
+        return Status::InternalError(
+                "not equal wal _column_id_count={} vs wal block columns 
size={}, "
+                "output block columns size={} vs tuple_descriptor size={}",
+                std::to_string(_column_id_count), 
std::to_string(src_block.columns()),
+                std::to_string(output_block_column_size),
+                std::to_string(_tuple_descriptor->slots().size()));
     }
-    _offset += k_wal_magic_length;
-    uint8_t version_buf[WalWriter::VERSION_SIZE];
-    RETURN_IF_ERROR(
-            file_reader->read_at(_offset, {version_buf, 
WalWriter::VERSION_SIZE}, &bytes_read));
-    _offset += WalWriter::VERSION_SIZE;
-    version = decode_fixed32_le(version_buf);
-    uint8_t len_buf[WalWriter::LENGTH_SIZE];
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
-    _offset += WalWriter::LENGTH_SIZE;
-    size_t len = decode_fixed64_le(len_buf);
-    col_ids.resize(len);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, col_ids, &bytes_read));
-    _offset += len;
-    if (len != bytes_read) {
-        return Status::InternalError("failed to read header expected= " + 
std::to_string(len) +
-                                     ",actually=" + 
std::to_string(bytes_read));
+    for (auto* slot_desc : _tuple_descriptor->slots()) {
+        auto pos = _column_pos_map[slot_desc->col_unique_id()];
+        if (pos >= src_block.columns()) {
+            return Status::InternalError("read wal {} fail, pos {}, columns 
size {}", _wal_path,
+                                         pos, src_block.columns());
+        }
+        vectorized::ColumnPtr column_ptr = 
src_block.get_by_position(pos).column;
+        if (!column_ptr && slot_desc->is_nullable()) {
+            column_ptr = make_nullable(column_ptr);
+        }
+        dst_block.insert(index, vectorized::ColumnWithTypeAndName(
+                                        std::move(column_ptr), 
output_block_columns[index].type,
+                                        output_block_columns[index].name));
+        index++;
     }
+    block->swap(dst_block);
+    *read_rows = block->rows();
+    VLOG_DEBUG << "read block rows:" << *read_rows;
     return Status::OK();
 }
 
-Status WalReader::_check_checksum(const char* binary, size_t size, uint32_t 
checksum) {
-    uint32_t computed_checksum = crc32c::Crc32c(binary, size);
-    if (LIKELY(computed_checksum == checksum)) {
-        return Status::OK();
+Status WalReader::get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
+                              std::unordered_set<std::string>* missing_cols) {
+    std::string col_ids;
+    RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids));
+    std::vector<std::string> column_id_vector =
+            absl::StrSplit(col_ids, ",", absl::SkipWhitespace());
+    _column_id_count = column_id_vector.size();
+    try {
+        int64_t pos = 0;
+        for (auto col_id_str : column_id_vector) {
+            auto col_id = std::strtoll(col_id_str.c_str(), nullptr, 10);
+            _column_pos_map.emplace(col_id, pos);
+            pos++;
+        }
+    } catch (const std::invalid_argument& e) {
+        return Status::InvalidArgument("Invalid format, {}", e.what());
     }
-    return Status::InternalError("checksum failed for wal=" + _file_name +
-                                 ", computed checksum=" + 
std::to_string(computed_checksum) +
-                                 ", expected=" + std::to_string(checksum));
+    return Status::OK();
 }
 
-} // namespace doris
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/load/group_commit/wal/wal_reader.h 
b/be/src/load/group_commit/wal/wal_reader.h
index 10a1f1e2229..b0ccd2b5008 100644
--- a/be/src/load/group_commit/wal/wal_reader.h
+++ b/be/src/load/group_commit/wal/wal_reader.h
@@ -16,32 +16,41 @@
 // under the License.
 
 #pragma once
+#include "format/generic_reader.h"
+#include "load/group_commit/wal/wal_file_reader.h"
+#include "runtime/descriptors.h"
 
-#include <gen_cpp/internal_service.pb.h>
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+struct ScannerCounter;
+class WalReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(WalReader);
 
-#include "common/status.h"
-#include "io/fs/file_reader_writer_fwd.h"
-
-namespace doris {
-
-class WalReader {
 public:
-    explicit WalReader(const std::string& file_name);
-    ~WalReader();
-
-    Status init();
-    Status finalize();
-
-    Status read_block(PBlock& block);
-    Status read_header(uint32_t& version, std::string& col_ids);
+    WalReader(RuntimeState* state);
+    ~WalReader() override = default;
+    Status init_reader(const TupleDescriptor* tuple_descriptor);
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+    Status get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status close() override {
+        if (_wal_reader) {
+            return _wal_reader->finalize();
+        }
+        return Status::OK();
+    }
 
 private:
-    Status _deserialize(PBlock& block, const std::string& buf, size_t 
block_len, size_t bytes_read);
-    Status _check_checksum(const char* binary, size_t size, uint32_t checksum);
-
-    std::string _file_name;
-    size_t _offset;
-    io::FileReaderSPtr file_reader;
+    RuntimeState* _state = nullptr;
+    int64_t _wal_id;
+    std::string _wal_path;
+    std::shared_ptr<doris::WalFileReader> _wal_reader = nullptr;
+    const TupleDescriptor* _tuple_descriptor = nullptr;
+    // column_id, column_pos
+    std::map<int64_t, int64_t> _column_pos_map;
+    int64_t _column_id_count;
+    uint32_t _version = 0;
 };
-
-} // namespace doris
\ No newline at end of file
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/load/group_commit/wal/wal_table.cpp 
b/be/src/load/group_commit/wal/wal_table.cpp
index 8d99f8ca509..4c6c0e1edb5 100644
--- a/be/src/load/group_commit/wal/wal_table.cpp
+++ b/be/src/load/group_commit/wal/wal_table.cpp
@@ -334,7 +334,8 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t 
tb_id,
 }
 
 Status WalTable::_read_wal_header(const std::string& wal_path, std::string& 
columns) {
-    std::shared_ptr<doris::WalReader> wal_reader = 
std::make_shared<WalReader>(wal_path);
+    std::shared_ptr<doris::WalFileReader> wal_reader =
+            std::make_shared<doris::WalFileReader>(wal_path);
     RETURN_IF_ERROR(wal_reader->init());
     uint32_t version = 0;
     RETURN_IF_ERROR(wal_reader->read_header(version, columns));
diff --git a/be/test/format/wal/wal_reader_writer_test.cpp 
b/be/test/format/wal/wal_reader_writer_test.cpp
index 4cad033f027..d18c5ec97e0 100644
--- a/be/test/format/wal/wal_reader_writer_test.cpp
+++ b/be/test/format/wal/wal_reader_writer_test.cpp
@@ -27,7 +27,7 @@
 #include "exec/exchange/vdata_stream_recvr.h"
 #include "gmock/gmock.h"
 #include "io/fs/local_file_system.h"
-#include "load/group_commit/wal/wal_reader.h"
+#include "load/group_commit/wal/wal_file_reader.h"
 #include "load/group_commit/wal/wal_writer.h"
 #include "runtime/exec_env.h"
 #include "service/brpc.h"
@@ -121,7 +121,7 @@ TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
     }
     static_cast<void>(wal_writer.finalize());
     // read block
-    auto wal_reader = WalReader(file_name);
+    auto wal_reader = WalFileReader(file_name);
     static_cast<void>(wal_reader.init());
     auto block_count = 0;
     while (true) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to