Copilot commented on code in PR #61135:
URL: https://github.com/apache/doris/pull/61135#discussion_r2902992207


##########
be/src/load/group_commit/wal/wal_reader.cpp:
##########
@@ -15,144 +15,110 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "load/group_commit/wal/wal_reader.h"
+#include "wal_reader.h"
 
-#include <crc32c/crc32c.h>
+#include <absl/strings/str_split.h>
 
-#include <string_view>
-#include <utility>
+#include "agent/be_exec_version_manager.h"
+#include "common/logging.h"
+#include "core/block/block.h"
+#include "cpp/sync_point.h"
+#include "load/group_commit/wal/wal_manager.h"
+#include "runtime/runtime_state.h"
 
-#include "common/status.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/file_system.h"
-#include "io/fs/path.h"
-#include "load/group_commit/wal/wal_writer.h"
-#include "util/coding.h"
-#include "util/string_util.h"
-
-namespace doris {
-
-WalReader::WalReader(const std::string& file_name) : _file_name(file_name), 
_offset(0) {}
-
-WalReader::~WalReader() = default;
-Status WalReader::_deserialize(PBlock& block, const std::string& buf, size_t 
block_len,
-                               size_t bytes_read) {
-    if (UNLIKELY(!block.ParseFromString(buf))) {
-        return Status::InternalError(
-                "failed to deserialize row, file_size=" + 
std::to_string(file_reader->size()) +
-                ", read_offset=" + std::to_string(_offset) + +", block_bytes=" 
+
-                std::to_string(block_len) + ", read_block_bytes=" + 
std::to_string(bytes_read));
-    }
-    return Status::OK();
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+WalReader::WalReader(RuntimeState* state) : _state(state) {
+    _wal_id = state->wal_id();
 }
 
-std::pair<int64_t, int64_t> parse_db_tb_from_wal_path(const std::string& 
wal_path) {
-    auto ret = split(wal_path, "/");
-    DCHECK_GT(ret.size(), 3);
-    auto db_id_pos = ret.size() - 1 - 2;
-    auto tb_id_pos = ret.size() - 1 - 1;
-    auto db_id = std::stoll(ret[db_id_pos]);
-    auto tb_id = std::stoll(ret[tb_id_pos]);
-
-    return {db_id, tb_id};
-}
-
-Status WalReader::init() {
-    auto [db_id, tb_id] = parse_db_tb_from_wal_path(_file_name);
-    io::FileSystemSPtr fs;
-    RETURN_IF_ERROR(determine_wal_fs(db_id, tb_id, fs));
-    bool exists = false;
-    RETURN_IF_ERROR(fs->exists(_file_name, &exists));
-    if (!exists) {
-        LOG(WARNING) << "not exist wal= " << _file_name;
-        return Status::NotFound("wal {} doesn't exist", _file_name);
-    }
-    RETURN_IF_ERROR(fs->open_file(_file_name, &file_reader));
-
+Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
+    _tuple_descriptor = tuple_descriptor;
+    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, 
_wal_path));
+    _wal_reader = std::make_shared<doris::WalFileReader>(_wal_path);
+    RETURN_IF_ERROR(_wal_reader->init());
     return Status::OK();
 }
 
-Status WalReader::finalize() {
-    if (file_reader) {
-        return file_reader->close();
-    }
-    return Status::OK();
-}
-
-Status WalReader::read_block(PBlock& block) {
-    if (_offset >= file_reader->size()) {
-        return Status::EndOfFile("end of wal file");
+Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    //read src block
+    PBlock pblock;
+    auto st = _wal_reader->read_block(pblock);
+    if (st.is<ErrorCode::END_OF_FILE>()) {
+        LOG(INFO) << "read eof on wal:" << _wal_path;
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
     }
-    size_t bytes_read = 0;
-    uint8_t row_len_buf[WalWriter::LENGTH_SIZE];
-    RETURN_IF_ERROR(
-            file_reader->read_at(_offset, {row_len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
-    _offset += WalWriter::LENGTH_SIZE;
-    size_t block_len = decode_fixed64_le(row_len_buf);
-    if (block_len == 0) {
-        return Status::DataQualityError("fail to read wal {} ,block is empty", 
_file_name);
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to read wal on path = " << _wal_path;
+        return st;
     }
-    if (_offset == file_reader->size()) {
-        LOG(WARNING) << "need read block with length=" << block_len << ", but 
offset=" << _offset
-                     << " reached end of WAL (path=" << _file_name
-                     << ", size=" << file_reader->size() << ")";
-        return Status::EndOfFile("end of wal file");
+    int be_exec_version = pblock.has_be_exec_version() ? 
pblock.be_exec_version() : 0;
+    if (!BeExecVersionManager::check_be_exec_version(be_exec_version)) {
+        return Status::DataQualityError("check be exec version fail when 
reading wal file {}",
+                                        _wal_path);
     }
-    // read block
-    std::string block_buf;
-    block_buf.resize(block_len);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), 
block_len}, &bytes_read));
-    RETURN_IF_ERROR(_deserialize(block, block_buf, block_len, bytes_read));
-    _offset += block_len;
-    // checksum
-    uint8_t checksum_len_buf[WalWriter::CHECKSUM_SIZE];
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf, 
WalWriter::CHECKSUM_SIZE},
-                                         &bytes_read));
-    _offset += WalWriter::CHECKSUM_SIZE;
-    uint32_t checksum = decode_fixed32_le(checksum_len_buf);
-    RETURN_IF_ERROR(_check_checksum(block_buf.data(), block_len, checksum));
-    return Status::OK();
-}
-
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
-    if (file_reader->size() == 0) {
-        return Status::DataQualityError("empty file");
-    }
-    size_t bytes_read = 0;
-    std::string magic_str;
-    magic_str.resize(k_wal_magic_length);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, magic_str, &bytes_read));
-    if (strcmp(magic_str.c_str(), k_wal_magic) != 0) {
-        return Status::Corruption("Bad wal file {}: magic number not match", 
_file_name);
+    Block src_block;
+    size_t uncompressed_size = 0;
+    int64_t uncompressed_time = 0;
+    RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size, 
&uncompressed_time));
+    //convert to dst block
+    Block dst_block;
+    int index = 0;
+    auto output_block_columns = block->get_columns_with_type_and_name();
+    size_t output_block_column_size = output_block_columns.size();
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", 
&_column_id_count);
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", 
&output_block_column_size);
+    if (_column_id_count != src_block.columns() ||
+        output_block_column_size != _tuple_descriptor->slots().size()) {
+        return Status::InternalError(
+                "not equal wal _column_id_count={} vs wal block columns 
size={}, "
+                "output block columns size={} vs tuple_descriptor size={}",
+                std::to_string(_column_id_count), 
std::to_string(src_block.columns()),
+                std::to_string(output_block_column_size),
+                std::to_string(_tuple_descriptor->slots().size()));
     }
-    _offset += k_wal_magic_length;
-    uint8_t version_buf[WalWriter::VERSION_SIZE];
-    RETURN_IF_ERROR(
-            file_reader->read_at(_offset, {version_buf, 
WalWriter::VERSION_SIZE}, &bytes_read));
-    _offset += WalWriter::VERSION_SIZE;
-    version = decode_fixed32_le(version_buf);
-    uint8_t len_buf[WalWriter::LENGTH_SIZE];
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
-    _offset += WalWriter::LENGTH_SIZE;
-    size_t len = decode_fixed64_le(len_buf);
-    col_ids.resize(len);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, col_ids, &bytes_read));
-    _offset += len;
-    if (len != bytes_read) {
-        return Status::InternalError("failed to read header expected= " + 
std::to_string(len) +
-                                     ",actually=" + 
std::to_string(bytes_read));
+    for (auto* slot_desc : _tuple_descriptor->slots()) {
+        auto pos = _column_pos_map[slot_desc->col_unique_id()];
+        if (pos >= src_block.columns()) {
+            return Status::InternalError("read wal {} fail, pos {}, columns 
size {}", _wal_path,
+                                         pos, src_block.columns());
+        }
+        vectorized::ColumnPtr column_ptr = 
src_block.get_by_position(pos).column;
+        if (!column_ptr && slot_desc->is_nullable()) {

Review Comment:
   `make_nullable()` dereferences its argument; calling it with a null 
`column_ptr` will crash. If a WAL column can be absent/null here, construct an 
all-NULL column of the expected type/row count instead; otherwise, remove the 
null check and assert the column is always present.
   ```suggestion
           if (!column_ptr) {
               // WAL column is absent; create an all-NULL column with the 
expected type and row count.
               auto& type = output_block_columns[index].type;
               column_ptr = type->create_column();
               column_ptr->insert_many_defaults(src_block.rows());
           } else if (slot_desc->is_nullable()) {
   ```



##########
be/src/load/group_commit/wal/wal_reader.h:
##########
@@ -16,32 +16,41 @@
 // under the License.
 
 #pragma once
+#include "format/generic_reader.h"
+#include "load/group_commit/wal/wal_file_reader.h"
+#include "runtime/descriptors.h"
 
-#include <gen_cpp/internal_service.pb.h>
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+struct ScannerCounter;
+class WalReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(WalReader);
 
-#include "common/status.h"
-#include "io/fs/file_reader_writer_fwd.h"
-
-namespace doris {
-
-class WalReader {
 public:
-    explicit WalReader(const std::string& file_name);
-    ~WalReader();
-
-    Status init();
-    Status finalize();
-
-    Status read_block(PBlock& block);
-    Status read_header(uint32_t& version, std::string& col_ids);
+    WalReader(RuntimeState* state);
+    ~WalReader() override = default;
+    Status init_reader(const TupleDescriptor* tuple_descriptor);
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Review Comment:
   `WalReader`'s public API uses `RuntimeState*`, but this header doesn't 
declare `RuntimeState` (nor include `runtime/runtime_state.h`). This can break 
any TU that includes this header without already forward-declaring 
`RuntimeState` (e.g. `wal_manager.h` includes it). Add a forward declaration 
for `doris::RuntimeState` (preferred) or include the defining header.



##########
be/src/load/group_commit/wal/wal_reader.cpp:
##########
@@ -15,144 +15,110 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "load/group_commit/wal/wal_reader.h"
+#include "wal_reader.h"
 
-#include <crc32c/crc32c.h>
+#include <absl/strings/str_split.h>
 
-#include <string_view>
-#include <utility>
+#include "agent/be_exec_version_manager.h"
+#include "common/logging.h"
+#include "core/block/block.h"
+#include "cpp/sync_point.h"
+#include "load/group_commit/wal/wal_manager.h"
+#include "runtime/runtime_state.h"
 
-#include "common/status.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/file_system.h"
-#include "io/fs/path.h"
-#include "load/group_commit/wal/wal_writer.h"
-#include "util/coding.h"
-#include "util/string_util.h"
-
-namespace doris {
-
-WalReader::WalReader(const std::string& file_name) : _file_name(file_name), 
_offset(0) {}
-
-WalReader::~WalReader() = default;
-Status WalReader::_deserialize(PBlock& block, const std::string& buf, size_t 
block_len,
-                               size_t bytes_read) {
-    if (UNLIKELY(!block.ParseFromString(buf))) {
-        return Status::InternalError(
-                "failed to deserialize row, file_size=" + 
std::to_string(file_reader->size()) +
-                ", read_offset=" + std::to_string(_offset) + +", block_bytes=" 
+
-                std::to_string(block_len) + ", read_block_bytes=" + 
std::to_string(bytes_read));
-    }
-    return Status::OK();
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+WalReader::WalReader(RuntimeState* state) : _state(state) {
+    _wal_id = state->wal_id();
 }
 
-std::pair<int64_t, int64_t> parse_db_tb_from_wal_path(const std::string& 
wal_path) {
-    auto ret = split(wal_path, "/");
-    DCHECK_GT(ret.size(), 3);
-    auto db_id_pos = ret.size() - 1 - 2;
-    auto tb_id_pos = ret.size() - 1 - 1;
-    auto db_id = std::stoll(ret[db_id_pos]);
-    auto tb_id = std::stoll(ret[tb_id_pos]);
-
-    return {db_id, tb_id};
-}
-
-Status WalReader::init() {
-    auto [db_id, tb_id] = parse_db_tb_from_wal_path(_file_name);
-    io::FileSystemSPtr fs;
-    RETURN_IF_ERROR(determine_wal_fs(db_id, tb_id, fs));
-    bool exists = false;
-    RETURN_IF_ERROR(fs->exists(_file_name, &exists));
-    if (!exists) {
-        LOG(WARNING) << "not exist wal= " << _file_name;
-        return Status::NotFound("wal {} doesn't exist", _file_name);
-    }
-    RETURN_IF_ERROR(fs->open_file(_file_name, &file_reader));
-
+Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
+    _tuple_descriptor = tuple_descriptor;
+    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, 
_wal_path));
+    _wal_reader = std::make_shared<doris::WalFileReader>(_wal_path);
+    RETURN_IF_ERROR(_wal_reader->init());
     return Status::OK();
 }
 
-Status WalReader::finalize() {
-    if (file_reader) {
-        return file_reader->close();
-    }
-    return Status::OK();
-}
-
-Status WalReader::read_block(PBlock& block) {
-    if (_offset >= file_reader->size()) {
-        return Status::EndOfFile("end of wal file");
+Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    //read src block
+    PBlock pblock;
+    auto st = _wal_reader->read_block(pblock);
+    if (st.is<ErrorCode::END_OF_FILE>()) {
+        LOG(INFO) << "read eof on wal:" << _wal_path;
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
     }
-    size_t bytes_read = 0;
-    uint8_t row_len_buf[WalWriter::LENGTH_SIZE];
-    RETURN_IF_ERROR(
-            file_reader->read_at(_offset, {row_len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
-    _offset += WalWriter::LENGTH_SIZE;
-    size_t block_len = decode_fixed64_le(row_len_buf);
-    if (block_len == 0) {
-        return Status::DataQualityError("fail to read wal {} ,block is empty", 
_file_name);
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to read wal on path = " << _wal_path;
+        return st;
     }
-    if (_offset == file_reader->size()) {
-        LOG(WARNING) << "need read block with length=" << block_len << ", but 
offset=" << _offset
-                     << " reached end of WAL (path=" << _file_name
-                     << ", size=" << file_reader->size() << ")";
-        return Status::EndOfFile("end of wal file");
+    int be_exec_version = pblock.has_be_exec_version() ? 
pblock.be_exec_version() : 0;
+    if (!BeExecVersionManager::check_be_exec_version(be_exec_version)) {
+        return Status::DataQualityError("check be exec version fail when 
reading wal file {}",
+                                        _wal_path);
     }
-    // read block
-    std::string block_buf;
-    block_buf.resize(block_len);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), 
block_len}, &bytes_read));
-    RETURN_IF_ERROR(_deserialize(block, block_buf, block_len, bytes_read));
-    _offset += block_len;
-    // checksum
-    uint8_t checksum_len_buf[WalWriter::CHECKSUM_SIZE];
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf, 
WalWriter::CHECKSUM_SIZE},
-                                         &bytes_read));
-    _offset += WalWriter::CHECKSUM_SIZE;
-    uint32_t checksum = decode_fixed32_le(checksum_len_buf);
-    RETURN_IF_ERROR(_check_checksum(block_buf.data(), block_len, checksum));
-    return Status::OK();
-}
-
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
-    if (file_reader->size() == 0) {
-        return Status::DataQualityError("empty file");
-    }
-    size_t bytes_read = 0;
-    std::string magic_str;
-    magic_str.resize(k_wal_magic_length);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, magic_str, &bytes_read));
-    if (strcmp(magic_str.c_str(), k_wal_magic) != 0) {
-        return Status::Corruption("Bad wal file {}: magic number not match", 
_file_name);
+    Block src_block;
+    size_t uncompressed_size = 0;
+    int64_t uncompressed_time = 0;
+    RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size, 
&uncompressed_time));
+    //convert to dst block
+    Block dst_block;
+    int index = 0;
+    auto output_block_columns = block->get_columns_with_type_and_name();
+    size_t output_block_column_size = output_block_columns.size();
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", 
&_column_id_count);
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", 
&output_block_column_size);
+    if (_column_id_count != src_block.columns() ||
+        output_block_column_size != _tuple_descriptor->slots().size()) {
+        return Status::InternalError(
+                "not equal wal _column_id_count={} vs wal block columns 
size={}, "
+                "output block columns size={} vs tuple_descriptor size={}",
+                std::to_string(_column_id_count), 
std::to_string(src_block.columns()),
+                std::to_string(output_block_column_size),
+                std::to_string(_tuple_descriptor->slots().size()));
     }
-    _offset += k_wal_magic_length;
-    uint8_t version_buf[WalWriter::VERSION_SIZE];
-    RETURN_IF_ERROR(
-            file_reader->read_at(_offset, {version_buf, 
WalWriter::VERSION_SIZE}, &bytes_read));
-    _offset += WalWriter::VERSION_SIZE;
-    version = decode_fixed32_le(version_buf);
-    uint8_t len_buf[WalWriter::LENGTH_SIZE];
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
-    _offset += WalWriter::LENGTH_SIZE;
-    size_t len = decode_fixed64_le(len_buf);
-    col_ids.resize(len);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, col_ids, &bytes_read));
-    _offset += len;
-    if (len != bytes_read) {
-        return Status::InternalError("failed to read header expected= " + 
std::to_string(len) +
-                                     ",actually=" + 
std::to_string(bytes_read));
+    for (auto* slot_desc : _tuple_descriptor->slots()) {
+        auto pos = _column_pos_map[slot_desc->col_unique_id()];

Review Comment:
   Using `_column_pos_map[slot_desc->col_unique_id()]` will insert a default 
entry (pos=0) when the WAL header doesn't contain that column id, silently 
reading the wrong column. Use a lookup that doesn't mutate the map and return 
an error / record it in `missing_cols` when the id is absent.
   ```suggestion
           auto it = _column_pos_map.find(slot_desc->col_unique_id());
           if (it == _column_pos_map.end()) {
               return Status::InternalError("read wal {} fail, column id {} not 
found in header",
                                            _wal_path, 
slot_desc->col_unique_id());
           }
           auto pos = it->second;
   ```



##########
be/src/load/group_commit/wal/wal_reader.cpp:
##########
@@ -15,144 +15,110 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "load/group_commit/wal/wal_reader.h"
+#include "wal_reader.h"
 
-#include <crc32c/crc32c.h>
+#include <absl/strings/str_split.h>
 
-#include <string_view>
-#include <utility>
+#include "agent/be_exec_version_manager.h"
+#include "common/logging.h"
+#include "core/block/block.h"
+#include "cpp/sync_point.h"
+#include "load/group_commit/wal/wal_manager.h"
+#include "runtime/runtime_state.h"
 
-#include "common/status.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/file_system.h"
-#include "io/fs/path.h"
-#include "load/group_commit/wal/wal_writer.h"
-#include "util/coding.h"
-#include "util/string_util.h"
-
-namespace doris {
-
-WalReader::WalReader(const std::string& file_name) : _file_name(file_name), 
_offset(0) {}
-
-WalReader::~WalReader() = default;
-Status WalReader::_deserialize(PBlock& block, const std::string& buf, size_t 
block_len,
-                               size_t bytes_read) {
-    if (UNLIKELY(!block.ParseFromString(buf))) {
-        return Status::InternalError(
-                "failed to deserialize row, file_size=" + 
std::to_string(file_reader->size()) +
-                ", read_offset=" + std::to_string(_offset) + +", block_bytes=" 
+
-                std::to_string(block_len) + ", read_block_bytes=" + 
std::to_string(bytes_read));
-    }
-    return Status::OK();
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+WalReader::WalReader(RuntimeState* state) : _state(state) {
+    _wal_id = state->wal_id();
 }
 
-std::pair<int64_t, int64_t> parse_db_tb_from_wal_path(const std::string& 
wal_path) {
-    auto ret = split(wal_path, "/");
-    DCHECK_GT(ret.size(), 3);
-    auto db_id_pos = ret.size() - 1 - 2;
-    auto tb_id_pos = ret.size() - 1 - 1;
-    auto db_id = std::stoll(ret[db_id_pos]);
-    auto tb_id = std::stoll(ret[tb_id_pos]);
-
-    return {db_id, tb_id};
-}
-
-Status WalReader::init() {
-    auto [db_id, tb_id] = parse_db_tb_from_wal_path(_file_name);
-    io::FileSystemSPtr fs;
-    RETURN_IF_ERROR(determine_wal_fs(db_id, tb_id, fs));
-    bool exists = false;
-    RETURN_IF_ERROR(fs->exists(_file_name, &exists));
-    if (!exists) {
-        LOG(WARNING) << "not exist wal= " << _file_name;
-        return Status::NotFound("wal {} doesn't exist", _file_name);
-    }
-    RETURN_IF_ERROR(fs->open_file(_file_name, &file_reader));
-
+Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
+    _tuple_descriptor = tuple_descriptor;
+    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, 
_wal_path));
+    _wal_reader = std::make_shared<doris::WalFileReader>(_wal_path);
+    RETURN_IF_ERROR(_wal_reader->init());
     return Status::OK();
 }
 
-Status WalReader::finalize() {
-    if (file_reader) {
-        return file_reader->close();
-    }
-    return Status::OK();
-}
-
-Status WalReader::read_block(PBlock& block) {
-    if (_offset >= file_reader->size()) {
-        return Status::EndOfFile("end of wal file");
+Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    //read src block
+    PBlock pblock;
+    auto st = _wal_reader->read_block(pblock);
+    if (st.is<ErrorCode::END_OF_FILE>()) {
+        LOG(INFO) << "read eof on wal:" << _wal_path;
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
     }
-    size_t bytes_read = 0;
-    uint8_t row_len_buf[WalWriter::LENGTH_SIZE];
-    RETURN_IF_ERROR(
-            file_reader->read_at(_offset, {row_len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
-    _offset += WalWriter::LENGTH_SIZE;
-    size_t block_len = decode_fixed64_le(row_len_buf);
-    if (block_len == 0) {
-        return Status::DataQualityError("fail to read wal {} ,block is empty", 
_file_name);
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to read wal on path = " << _wal_path;
+        return st;
     }
-    if (_offset == file_reader->size()) {
-        LOG(WARNING) << "need read block with length=" << block_len << ", but 
offset=" << _offset
-                     << " reached end of WAL (path=" << _file_name
-                     << ", size=" << file_reader->size() << ")";
-        return Status::EndOfFile("end of wal file");
+    int be_exec_version = pblock.has_be_exec_version() ? 
pblock.be_exec_version() : 0;
+    if (!BeExecVersionManager::check_be_exec_version(be_exec_version)) {
+        return Status::DataQualityError("check be exec version fail when 
reading wal file {}",
+                                        _wal_path);
     }
-    // read block
-    std::string block_buf;
-    block_buf.resize(block_len);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), 
block_len}, &bytes_read));
-    RETURN_IF_ERROR(_deserialize(block, block_buf, block_len, bytes_read));
-    _offset += block_len;
-    // checksum
-    uint8_t checksum_len_buf[WalWriter::CHECKSUM_SIZE];
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf, 
WalWriter::CHECKSUM_SIZE},
-                                         &bytes_read));
-    _offset += WalWriter::CHECKSUM_SIZE;
-    uint32_t checksum = decode_fixed32_le(checksum_len_buf);
-    RETURN_IF_ERROR(_check_checksum(block_buf.data(), block_len, checksum));
-    return Status::OK();
-}
-
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
-    if (file_reader->size() == 0) {
-        return Status::DataQualityError("empty file");
-    }
-    size_t bytes_read = 0;
-    std::string magic_str;
-    magic_str.resize(k_wal_magic_length);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, magic_str, &bytes_read));
-    if (strcmp(magic_str.c_str(), k_wal_magic) != 0) {
-        return Status::Corruption("Bad wal file {}: magic number not match", 
_file_name);
+    Block src_block;
+    size_t uncompressed_size = 0;
+    int64_t uncompressed_time = 0;
+    RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size, 
&uncompressed_time));
+    //convert to dst block
+    Block dst_block;
+    int index = 0;
+    auto output_block_columns = block->get_columns_with_type_and_name();
+    size_t output_block_column_size = output_block_columns.size();
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", 
&_column_id_count);
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", 
&output_block_column_size);
+    if (_column_id_count != src_block.columns() ||
+        output_block_column_size != _tuple_descriptor->slots().size()) {
+        return Status::InternalError(
+                "not equal wal _column_id_count={} vs wal block columns 
size={}, "
+                "output block columns size={} vs tuple_descriptor size={}",
+                std::to_string(_column_id_count), 
std::to_string(src_block.columns()),
+                std::to_string(output_block_column_size),
+                std::to_string(_tuple_descriptor->slots().size()));
     }
-    _offset += k_wal_magic_length;
-    uint8_t version_buf[WalWriter::VERSION_SIZE];
-    RETURN_IF_ERROR(
-            file_reader->read_at(_offset, {version_buf, 
WalWriter::VERSION_SIZE}, &bytes_read));
-    _offset += WalWriter::VERSION_SIZE;
-    version = decode_fixed32_le(version_buf);
-    uint8_t len_buf[WalWriter::LENGTH_SIZE];
-    RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
-    _offset += WalWriter::LENGTH_SIZE;
-    size_t len = decode_fixed64_le(len_buf);
-    col_ids.resize(len);
-    RETURN_IF_ERROR(file_reader->read_at(_offset, col_ids, &bytes_read));
-    _offset += len;
-    if (len != bytes_read) {
-        return Status::InternalError("failed to read header expected= " + 
std::to_string(len) +
-                                     ",actually=" + 
std::to_string(bytes_read));
+    for (auto* slot_desc : _tuple_descriptor->slots()) {
+        auto pos = _column_pos_map[slot_desc->col_unique_id()];
+        if (pos >= src_block.columns()) {
+            return Status::InternalError("read wal {} fail, pos {}, columns 
size {}", _wal_path,
+                                         pos, src_block.columns());
+        }
+        vectorized::ColumnPtr column_ptr = 
src_block.get_by_position(pos).column;
+        if (!column_ptr && slot_desc->is_nullable()) {
+            column_ptr = make_nullable(column_ptr);
+        }
+        dst_block.insert(index, vectorized::ColumnWithTypeAndName(
+                                        std::move(column_ptr), 
output_block_columns[index].type,
+                                        output_block_columns[index].name));
+        index++;
     }
+    block->swap(dst_block);
+    *read_rows = block->rows();
+    VLOG_DEBUG << "read block rows:" << *read_rows;
     return Status::OK();
 }
 
-Status WalReader::_check_checksum(const char* binary, size_t size, uint32_t 
checksum) {
-    uint32_t computed_checksum = crc32c::Crc32c(binary, size);
-    if (LIKELY(computed_checksum == checksum)) {
-        return Status::OK();
+Status WalReader::get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
+                              std::unordered_set<std::string>* missing_cols) {
+    std::string col_ids;
+    RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids));
+    std::vector<std::string> column_id_vector =
+            absl::StrSplit(col_ids, ",", absl::SkipWhitespace());
+    _column_id_count = column_id_vector.size();
+    try {
+        int64_t pos = 0;
+        for (auto col_id_str : column_id_vector) {
+            auto col_id = std::strtoll(col_id_str.c_str(), nullptr, 10);
+            _column_pos_map.emplace(col_id, pos);
+            pos++;
+        }
+    } catch (const std::invalid_argument& e) {
+        return Status::InvalidArgument("Invalid format, {}", e.what());

Review Comment:
   `std::strtoll` doesn't throw `std::invalid_argument`, so the try/catch here 
is ineffective. If you need validation, parse with an end-pointer/errno check 
and return `InvalidArgument` when the entire token isn't a valid integer; also 
consider clearing `_column_pos_map` before filling to avoid stale entries if 
`get_columns()` is called more than once.



##########
be/src/load/group_commit/wal/wal_file_reader.cpp:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "load/group_commit/wal/wal_file_reader.h"
+
+#include <crc32c/crc32c.h>
+
+#include <string_view>
+#include <utility>
+
+#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_system.h"
+#include "io/fs/path.h"
+#include "load/group_commit/wal/wal_writer.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace doris {
+
+WalFileReader::WalFileReader(const std::string& file_name) : 
_file_name(file_name), _offset(0) {}
+
+WalFileReader::~WalFileReader() = default;
+Status WalFileReader::_deserialize(PBlock& block, const std::string& buf, 
size_t block_len,
+                                   size_t bytes_read) {
+    if (UNLIKELY(!block.ParseFromString(buf))) {
+        return Status::InternalError(
+                "failed to deserialize row, file_size=" + 
std::to_string(file_reader->size()) +
+                ", read_offset=" + std::to_string(_offset) + +", block_bytes=" 
+

Review Comment:
   `_deserialize()` builds an error string with `... + std::to_string(_offset) 
+ + ", block_bytes=" + ...` (double `+`), which is a compile-time error. Remove 
the stray `+` so the concatenation is valid (or switch to a formatter to avoid 
manual concatenation).
   ```suggestion
                   ", read_offset=" + std::to_string(_offset) + ", 
block_bytes=" +
   ```



##########
be/src/load/group_commit/wal/wal_file_reader.cpp:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "load/group_commit/wal/wal_file_reader.h"
+
+#include <crc32c/crc32c.h>
+
+#include <string_view>
+#include <utility>
+
+#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_system.h"
+#include "io/fs/path.h"
+#include "load/group_commit/wal/wal_writer.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace doris {
+
+WalFileReader::WalFileReader(const std::string& file_name) : 
_file_name(file_name), _offset(0) {}
+
+WalFileReader::~WalFileReader() = default;
+Status WalFileReader::_deserialize(PBlock& block, const std::string& buf, 
size_t block_len,
+                                   size_t bytes_read) {
+    if (UNLIKELY(!block.ParseFromString(buf))) {
+        return Status::InternalError(
+                "failed to deserialize row, file_size=" + 
std::to_string(file_reader->size()) +
+                ", read_offset=" + std::to_string(_offset) + +", block_bytes=" 
+
+                std::to_string(block_len) + ", read_block_bytes=" + 
std::to_string(bytes_read));
+    }
+    return Status::OK();
+}
+
+std::pair<int64_t, int64_t> parse_db_tb_from_wal_path(const std::string& 
wal_path) {
+    auto ret = split(wal_path, "/");
+    DCHECK_GT(ret.size(), 3);
+    auto db_id_pos = ret.size() - 1 - 2;
+    auto tb_id_pos = ret.size() - 1 - 1;
+    auto db_id = std::stoll(ret[db_id_pos]);
+    auto tb_id = std::stoll(ret[tb_id_pos]);
+
+    return {db_id, tb_id};
+}
+
+Status WalFileReader::init() {
+    auto [db_id, tb_id] = parse_db_tb_from_wal_path(_file_name);
+    io::FileSystemSPtr fs;
+    RETURN_IF_ERROR(determine_wal_fs(db_id, tb_id, fs));
+    bool exists = false;
+    RETURN_IF_ERROR(fs->exists(_file_name, &exists));
+    if (!exists) {
+        LOG(WARNING) << "not exist wal= " << _file_name;
+        return Status::NotFound("wal {} doesn't exist", _file_name);
+    }
+    RETURN_IF_ERROR(fs->open_file(_file_name, &file_reader));
+
+    return Status::OK();
+}
+
+Status WalFileReader::finalize() {
+    if (file_reader) {
+        return file_reader->close();
+    }
+    return Status::OK();
+}
+
+Status WalFileReader::read_block(PBlock& block) {
+    if (_offset >= file_reader->size()) {
+        return Status::EndOfFile("end of wal file");
+    }
+    size_t bytes_read = 0;
+    uint8_t row_len_buf[WalWriter::LENGTH_SIZE];
+    RETURN_IF_ERROR(
+            file_reader->read_at(_offset, {row_len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
+    _offset += WalWriter::LENGTH_SIZE;
+    size_t block_len = decode_fixed64_le(row_len_buf);
+    if (block_len == 0) {
+        return Status::DataQualityError("fail to read wal {} ,block is empty", 
_file_name);
+    }
+    if (_offset == file_reader->size()) {
+        LOG(WARNING) << "need read block with length=" << block_len << ", but 
offset=" << _offset
+                     << " reached end of WAL (path=" << _file_name
+                     << ", size=" << file_reader->size() << ")";
+        return Status::EndOfFile("end of wal file");
+    }
+    // read block
+    std::string block_buf;
+    block_buf.resize(block_len);
+    RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), 
block_len}, &bytes_read));

Review Comment:
   `read_at()` writes into the provided buffer, but `block_buf.c_str()` returns 
a pointer to a const, null-terminated view of the string. Writing through it 
relies on `const_cast` and is undefined behavior per the C++ standard; use a 
mutable buffer pointer from the string (e.g., its writable data) instead.
   ```suggestion
       RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.data(), 
block_len}, &bytes_read));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to