This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 798d64411e7 [fix](wal) rename wal_reader and move to wal directory
(#61135)
798d64411e7 is described below
commit 798d64411e7d6e121e1cca136e9b62b738fef78a
Author: meiyi <[email protected]>
AuthorDate: Mon Mar 9 11:33:04 2026 +0800
[fix](wal) rename wal_reader and move to wal directory (#61135)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/exec/scan/file_scanner.cpp | 2 +-
be/src/format/wal/wal_reader.cpp | 124 ------------
be/src/format/wal/wal_reader.h | 56 ------
.../wal/{wal_reader.cpp => wal_file_reader.cpp} | 20 +-
.../wal/{wal_reader.h => wal_file_reader.h} | 6 +-
be/src/load/group_commit/wal/wal_manager.cpp | 2 +-
be/src/load/group_commit/wal/wal_reader.cpp | 210 +++++++++------------
be/src/load/group_commit/wal/wal_reader.h | 55 +++---
be/src/load/group_commit/wal/wal_table.cpp | 3 +-
be/test/format/wal/wal_reader_writer_test.cpp | 4 +-
10 files changed, 139 insertions(+), 343 deletions(-)
diff --git a/be/src/exec/scan/file_scanner.cpp
b/be/src/exec/scan/file_scanner.cpp
index c13185f55de..04902e58b6e 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -78,8 +78,8 @@
#include "format/table/transactional_hive_reader.h"
#include "format/table/trino_connector_jni_reader.h"
#include "format/text/text_reader.h"
-#include "format/wal/wal_reader.h"
#include "io/cache/block_file_cache_profile.h"
+#include "load/group_commit/wal/wal_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
diff --git a/be/src/format/wal/wal_reader.cpp b/be/src/format/wal/wal_reader.cpp
deleted file mode 100644
index 78d5e855659..00000000000
--- a/be/src/format/wal/wal_reader.cpp
+++ /dev/null
@@ -1,124 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "format/wal/wal_reader.h"
-
-#include <absl/strings/str_split.h>
-
-#include "agent/be_exec_version_manager.h"
-#include "common/logging.h"
-#include "core/block/block.h"
-#include "cpp/sync_point.h"
-#include "load/group_commit/wal/wal_manager.h"
-#include "runtime/runtime_state.h"
-
-namespace doris::vectorized {
-#include "common/compile_check_begin.h"
-WalReader::WalReader(RuntimeState* state) : _state(state) {
- _wal_id = state->wal_id();
-}
-
-Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
- _tuple_descriptor = tuple_descriptor;
- RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id,
_wal_path));
- _wal_reader = std::make_shared<doris::WalReader>(_wal_path);
- RETURN_IF_ERROR(_wal_reader->init());
- return Status::OK();
-}
-
-Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
- //read src block
- PBlock pblock;
- auto st = _wal_reader->read_block(pblock);
- if (st.is<ErrorCode::END_OF_FILE>()) {
- LOG(INFO) << "read eof on wal:" << _wal_path;
- *read_rows = 0;
- *eof = true;
- return Status::OK();
- }
- if (!st.ok()) {
- LOG(WARNING) << "Failed to read wal on path = " << _wal_path;
- return st;
- }
- int be_exec_version = pblock.has_be_exec_version() ?
pblock.be_exec_version() : 0;
- if (!BeExecVersionManager::check_be_exec_version(be_exec_version)) {
- return Status::DataQualityError("check be exec version fail when
reading wal file {}",
- _wal_path);
- }
- Block src_block;
- size_t uncompressed_size = 0;
- int64_t uncompressed_time = 0;
- RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size,
&uncompressed_time));
- //convert to dst block
- Block dst_block;
- int index = 0;
- auto output_block_columns = block->get_columns_with_type_and_name();
- size_t output_block_column_size = output_block_columns.size();
- TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count",
&_column_id_count);
- TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size",
&output_block_column_size);
- if (_column_id_count != src_block.columns() ||
- output_block_column_size != _tuple_descriptor->slots().size()) {
- return Status::InternalError(
- "not equal wal _column_id_count={} vs wal block columns
size={}, "
- "output block columns size={} vs tuple_descriptor size={}",
- std::to_string(_column_id_count),
std::to_string(src_block.columns()),
- std::to_string(output_block_column_size),
- std::to_string(_tuple_descriptor->slots().size()));
- }
- for (auto* slot_desc : _tuple_descriptor->slots()) {
- auto pos = _column_pos_map[slot_desc->col_unique_id()];
- if (pos >= src_block.columns()) {
- return Status::InternalError("read wal {} fail, pos {}, columns
size {}", _wal_path,
- pos, src_block.columns());
- }
- vectorized::ColumnPtr column_ptr =
src_block.get_by_position(pos).column;
- if (!column_ptr && slot_desc->is_nullable()) {
- column_ptr = make_nullable(column_ptr);
- }
- dst_block.insert(index, vectorized::ColumnWithTypeAndName(
- std::move(column_ptr),
output_block_columns[index].type,
- output_block_columns[index].name));
- index++;
- }
- block->swap(dst_block);
- *read_rows = block->rows();
- VLOG_DEBUG << "read block rows:" << *read_rows;
- return Status::OK();
-}
-
-Status WalReader::get_columns(std::unordered_map<std::string, DataTypePtr>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) {
- std::string col_ids;
- RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids));
- std::vector<std::string> column_id_vector =
- absl::StrSplit(col_ids, ",", absl::SkipWhitespace());
- _column_id_count = column_id_vector.size();
- try {
- int64_t pos = 0;
- for (auto col_id_str : column_id_vector) {
- auto col_id = std::strtoll(col_id_str.c_str(), nullptr, 10);
- _column_pos_map.emplace(col_id, pos);
- pos++;
- }
- } catch (const std::invalid_argument& e) {
- return Status::InvalidArgument("Invalid format, {}", e.what());
- }
- return Status::OK();
-}
-
-#include "common/compile_check_end.h"
-} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/format/wal/wal_reader.h b/be/src/format/wal/wal_reader.h
deleted file mode 100644
index e2e9f02c248..00000000000
--- a/be/src/format/wal/wal_reader.h
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-#include "format/generic_reader.h"
-#include "load/group_commit/wal/wal_reader.h"
-#include "runtime/descriptors.h"
-
-namespace doris::vectorized {
-#include "common/compile_check_begin.h"
-struct ScannerCounter;
-class WalReader : public GenericReader {
- ENABLE_FACTORY_CREATOR(WalReader);
-
-public:
- WalReader(RuntimeState* state);
- ~WalReader() override = default;
- Status init_reader(const TupleDescriptor* tuple_descriptor);
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
- Status get_columns(std::unordered_map<std::string, DataTypePtr>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
-
- Status close() override {
- if (_wal_reader) {
- return _wal_reader->finalize();
- }
- return Status::OK();
- }
-
-private:
- RuntimeState* _state = nullptr;
- int64_t _wal_id;
- std::string _wal_path;
- std::shared_ptr<doris::WalReader> _wal_reader = nullptr;
- const TupleDescriptor* _tuple_descriptor = nullptr;
- // column_id, column_pos
- std::map<int64_t, int64_t> _column_pos_map;
- int64_t _column_id_count;
- uint32_t _version = 0;
-};
-#include "common/compile_check_end.h"
-} // namespace doris::vectorized
diff --git a/be/src/load/group_commit/wal/wal_reader.cpp
b/be/src/load/group_commit/wal/wal_file_reader.cpp
similarity index 90%
copy from be/src/load/group_commit/wal/wal_reader.cpp
copy to be/src/load/group_commit/wal/wal_file_reader.cpp
index 9329fee0199..cd9430a6c18 100644
--- a/be/src/load/group_commit/wal/wal_reader.cpp
+++ b/be/src/load/group_commit/wal/wal_file_reader.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "load/group_commit/wal/wal_reader.h"
+#include "load/group_commit/wal/wal_file_reader.h"
#include <crc32c/crc32c.h>
@@ -32,11 +32,11 @@
namespace doris {
-WalReader::WalReader(const std::string& file_name) : _file_name(file_name),
_offset(0) {}
+WalFileReader::WalFileReader(const std::string& file_name) :
_file_name(file_name), _offset(0) {}
-WalReader::~WalReader() = default;
-Status WalReader::_deserialize(PBlock& block, const std::string& buf, size_t
block_len,
- size_t bytes_read) {
+WalFileReader::~WalFileReader() = default;
+Status WalFileReader::_deserialize(PBlock& block, const std::string& buf,
size_t block_len,
+ size_t bytes_read) {
if (UNLIKELY(!block.ParseFromString(buf))) {
return Status::InternalError(
"failed to deserialize row, file_size=" +
std::to_string(file_reader->size()) +
@@ -57,7 +57,7 @@ std::pair<int64_t, int64_t> parse_db_tb_from_wal_path(const
std::string& wal_pat
return {db_id, tb_id};
}
-Status WalReader::init() {
+Status WalFileReader::init() {
auto [db_id, tb_id] = parse_db_tb_from_wal_path(_file_name);
io::FileSystemSPtr fs;
RETURN_IF_ERROR(determine_wal_fs(db_id, tb_id, fs));
@@ -72,14 +72,14 @@ Status WalReader::init() {
return Status::OK();
}
-Status WalReader::finalize() {
+Status WalFileReader::finalize() {
if (file_reader) {
return file_reader->close();
}
return Status::OK();
}
-Status WalReader::read_block(PBlock& block) {
+Status WalFileReader::read_block(PBlock& block) {
if (_offset >= file_reader->size()) {
return Status::EndOfFile("end of wal file");
}
@@ -114,7 +114,7 @@ Status WalReader::read_block(PBlock& block) {
return Status::OK();
}
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
+Status WalFileReader::read_header(uint32_t& version, std::string& col_ids) {
if (file_reader->size() == 0) {
return Status::DataQualityError("empty file");
}
@@ -145,7 +145,7 @@ Status WalReader::read_header(uint32_t& version,
std::string& col_ids) {
return Status::OK();
}
-Status WalReader::_check_checksum(const char* binary, size_t size, uint32_t
checksum) {
+Status WalFileReader::_check_checksum(const char* binary, size_t size,
uint32_t checksum) {
uint32_t computed_checksum = crc32c::Crc32c(binary, size);
if (LIKELY(computed_checksum == checksum)) {
return Status::OK();
diff --git a/be/src/load/group_commit/wal/wal_reader.h
b/be/src/load/group_commit/wal/wal_file_reader.h
similarity index 91%
copy from be/src/load/group_commit/wal/wal_reader.h
copy to be/src/load/group_commit/wal/wal_file_reader.h
index 10a1f1e2229..574f46f2cf8 100644
--- a/be/src/load/group_commit/wal/wal_reader.h
+++ b/be/src/load/group_commit/wal/wal_file_reader.h
@@ -24,10 +24,10 @@
namespace doris {
-class WalReader {
+class WalFileReader {
public:
- explicit WalReader(const std::string& file_name);
- ~WalReader();
+ explicit WalFileReader(const std::string& file_name);
+ ~WalFileReader();
Status init();
Status finalize();
diff --git a/be/src/load/group_commit/wal/wal_manager.cpp
b/be/src/load/group_commit/wal/wal_manager.cpp
index 962061a236b..06d009404f7 100644
--- a/be/src/load/group_commit/wal/wal_manager.cpp
+++ b/be/src/load/group_commit/wal/wal_manager.cpp
@@ -30,9 +30,9 @@
#include "common/config.h"
#include "common/status.h"
-#include "format/wal/wal_reader.h"
#include "io/fs/local_file_system.h"
#include "load/group_commit/wal/wal_dirs_info.h"
+#include "load/group_commit/wal/wal_reader.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "util/parse_util.h"
diff --git a/be/src/load/group_commit/wal/wal_reader.cpp
b/be/src/load/group_commit/wal/wal_reader.cpp
index 9329fee0199..a2ff97f1c02 100644
--- a/be/src/load/group_commit/wal/wal_reader.cpp
+++ b/be/src/load/group_commit/wal/wal_reader.cpp
@@ -15,144 +15,110 @@
// specific language governing permissions and limitations
// under the License.
-#include "load/group_commit/wal/wal_reader.h"
+#include "wal_reader.h"
-#include <crc32c/crc32c.h>
+#include <absl/strings/str_split.h>
-#include <string_view>
-#include <utility>
+#include "agent/be_exec_version_manager.h"
+#include "common/logging.h"
+#include "core/block/block.h"
+#include "cpp/sync_point.h"
+#include "load/group_commit/wal/wal_manager.h"
+#include "runtime/runtime_state.h"
-#include "common/status.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/file_system.h"
-#include "io/fs/path.h"
-#include "load/group_commit/wal/wal_writer.h"
-#include "util/coding.h"
-#include "util/string_util.h"
-
-namespace doris {
-
-WalReader::WalReader(const std::string& file_name) : _file_name(file_name),
_offset(0) {}
-
-WalReader::~WalReader() = default;
-Status WalReader::_deserialize(PBlock& block, const std::string& buf, size_t
block_len,
- size_t bytes_read) {
- if (UNLIKELY(!block.ParseFromString(buf))) {
- return Status::InternalError(
- "failed to deserialize row, file_size=" +
std::to_string(file_reader->size()) +
- ", read_offset=" + std::to_string(_offset) + +", block_bytes="
+
- std::to_string(block_len) + ", read_block_bytes=" +
std::to_string(bytes_read));
- }
- return Status::OK();
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+WalReader::WalReader(RuntimeState* state) : _state(state) {
+ _wal_id = state->wal_id();
}
-std::pair<int64_t, int64_t> parse_db_tb_from_wal_path(const std::string&
wal_path) {
- auto ret = split(wal_path, "/");
- DCHECK_GT(ret.size(), 3);
- auto db_id_pos = ret.size() - 1 - 2;
- auto tb_id_pos = ret.size() - 1 - 1;
- auto db_id = std::stoll(ret[db_id_pos]);
- auto tb_id = std::stoll(ret[tb_id_pos]);
-
- return {db_id, tb_id};
-}
-
-Status WalReader::init() {
- auto [db_id, tb_id] = parse_db_tb_from_wal_path(_file_name);
- io::FileSystemSPtr fs;
- RETURN_IF_ERROR(determine_wal_fs(db_id, tb_id, fs));
- bool exists = false;
- RETURN_IF_ERROR(fs->exists(_file_name, &exists));
- if (!exists) {
- LOG(WARNING) << "not exist wal= " << _file_name;
- return Status::NotFound("wal {} doesn't exist", _file_name);
- }
- RETURN_IF_ERROR(fs->open_file(_file_name, &file_reader));
-
+Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
+ _tuple_descriptor = tuple_descriptor;
+ RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id,
_wal_path));
+ _wal_reader = std::make_shared<doris::WalFileReader>(_wal_path);
+ RETURN_IF_ERROR(_wal_reader->init());
return Status::OK();
}
-Status WalReader::finalize() {
- if (file_reader) {
- return file_reader->close();
- }
- return Status::OK();
-}
-
-Status WalReader::read_block(PBlock& block) {
- if (_offset >= file_reader->size()) {
- return Status::EndOfFile("end of wal file");
+Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+ //read src block
+ PBlock pblock;
+ auto st = _wal_reader->read_block(pblock);
+ if (st.is<ErrorCode::END_OF_FILE>()) {
+ LOG(INFO) << "read eof on wal:" << _wal_path;
+ *read_rows = 0;
+ *eof = true;
+ return Status::OK();
}
- size_t bytes_read = 0;
- uint8_t row_len_buf[WalWriter::LENGTH_SIZE];
- RETURN_IF_ERROR(
- file_reader->read_at(_offset, {row_len_buf,
WalWriter::LENGTH_SIZE}, &bytes_read));
- _offset += WalWriter::LENGTH_SIZE;
- size_t block_len = decode_fixed64_le(row_len_buf);
- if (block_len == 0) {
- return Status::DataQualityError("fail to read wal {} ,block is empty",
_file_name);
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to read wal on path = " << _wal_path;
+ return st;
}
- if (_offset == file_reader->size()) {
- LOG(WARNING) << "need read block with length=" << block_len << ", but
offset=" << _offset
- << " reached end of WAL (path=" << _file_name
- << ", size=" << file_reader->size() << ")";
- return Status::EndOfFile("end of wal file");
+ int be_exec_version = pblock.has_be_exec_version() ?
pblock.be_exec_version() : 0;
+ if (!BeExecVersionManager::check_be_exec_version(be_exec_version)) {
+ return Status::DataQualityError("check be exec version fail when
reading wal file {}",
+ _wal_path);
}
- // read block
- std::string block_buf;
- block_buf.resize(block_len);
- RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(),
block_len}, &bytes_read));
- RETURN_IF_ERROR(_deserialize(block, block_buf, block_len, bytes_read));
- _offset += block_len;
- // checksum
- uint8_t checksum_len_buf[WalWriter::CHECKSUM_SIZE];
- RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf,
WalWriter::CHECKSUM_SIZE},
- &bytes_read));
- _offset += WalWriter::CHECKSUM_SIZE;
- uint32_t checksum = decode_fixed32_le(checksum_len_buf);
- RETURN_IF_ERROR(_check_checksum(block_buf.data(), block_len, checksum));
- return Status::OK();
-}
-
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
- if (file_reader->size() == 0) {
- return Status::DataQualityError("empty file");
- }
- size_t bytes_read = 0;
- std::string magic_str;
- magic_str.resize(k_wal_magic_length);
- RETURN_IF_ERROR(file_reader->read_at(_offset, magic_str, &bytes_read));
- if (strcmp(magic_str.c_str(), k_wal_magic) != 0) {
- return Status::Corruption("Bad wal file {}: magic number not match",
_file_name);
+ Block src_block;
+ size_t uncompressed_size = 0;
+ int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size,
&uncompressed_time));
+ //convert to dst block
+ Block dst_block;
+ int index = 0;
+ auto output_block_columns = block->get_columns_with_type_and_name();
+ size_t output_block_column_size = output_block_columns.size();
+ TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count",
&_column_id_count);
+ TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size",
&output_block_column_size);
+ if (_column_id_count != src_block.columns() ||
+ output_block_column_size != _tuple_descriptor->slots().size()) {
+ return Status::InternalError(
+ "not equal wal _column_id_count={} vs wal block columns
size={}, "
+ "output block columns size={} vs tuple_descriptor size={}",
+ std::to_string(_column_id_count),
std::to_string(src_block.columns()),
+ std::to_string(output_block_column_size),
+ std::to_string(_tuple_descriptor->slots().size()));
}
- _offset += k_wal_magic_length;
- uint8_t version_buf[WalWriter::VERSION_SIZE];
- RETURN_IF_ERROR(
- file_reader->read_at(_offset, {version_buf,
WalWriter::VERSION_SIZE}, &bytes_read));
- _offset += WalWriter::VERSION_SIZE;
- version = decode_fixed32_le(version_buf);
- uint8_t len_buf[WalWriter::LENGTH_SIZE];
- RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf,
WalWriter::LENGTH_SIZE}, &bytes_read));
- _offset += WalWriter::LENGTH_SIZE;
- size_t len = decode_fixed64_le(len_buf);
- col_ids.resize(len);
- RETURN_IF_ERROR(file_reader->read_at(_offset, col_ids, &bytes_read));
- _offset += len;
- if (len != bytes_read) {
- return Status::InternalError("failed to read header expected= " +
std::to_string(len) +
- ",actually=" +
std::to_string(bytes_read));
+ for (auto* slot_desc : _tuple_descriptor->slots()) {
+ auto pos = _column_pos_map[slot_desc->col_unique_id()];
+ if (pos >= src_block.columns()) {
+ return Status::InternalError("read wal {} fail, pos {}, columns
size {}", _wal_path,
+ pos, src_block.columns());
+ }
+ vectorized::ColumnPtr column_ptr =
src_block.get_by_position(pos).column;
+ if (!column_ptr && slot_desc->is_nullable()) {
+ column_ptr = make_nullable(column_ptr);
+ }
+ dst_block.insert(index, vectorized::ColumnWithTypeAndName(
+ std::move(column_ptr),
output_block_columns[index].type,
+ output_block_columns[index].name));
+ index++;
}
+ block->swap(dst_block);
+ *read_rows = block->rows();
+ VLOG_DEBUG << "read block rows:" << *read_rows;
return Status::OK();
}
-Status WalReader::_check_checksum(const char* binary, size_t size, uint32_t
checksum) {
- uint32_t computed_checksum = crc32c::Crc32c(binary, size);
- if (LIKELY(computed_checksum == checksum)) {
- return Status::OK();
+Status WalReader::get_columns(std::unordered_map<std::string, DataTypePtr>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) {
+ std::string col_ids;
+ RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids));
+ std::vector<std::string> column_id_vector =
+ absl::StrSplit(col_ids, ",", absl::SkipWhitespace());
+ _column_id_count = column_id_vector.size();
+ try {
+ int64_t pos = 0;
+ for (auto col_id_str : column_id_vector) {
+ auto col_id = std::strtoll(col_id_str.c_str(), nullptr, 10);
+ _column_pos_map.emplace(col_id, pos);
+ pos++;
+ }
+ } catch (const std::invalid_argument& e) {
+ return Status::InvalidArgument("Invalid format, {}", e.what());
}
- return Status::InternalError("checksum failed for wal=" + _file_name +
- ", computed checksum=" +
std::to_string(computed_checksum) +
- ", expected=" + std::to_string(checksum));
+ return Status::OK();
}
-} // namespace doris
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/load/group_commit/wal/wal_reader.h
b/be/src/load/group_commit/wal/wal_reader.h
index 10a1f1e2229..b0ccd2b5008 100644
--- a/be/src/load/group_commit/wal/wal_reader.h
+++ b/be/src/load/group_commit/wal/wal_reader.h
@@ -16,32 +16,41 @@
// under the License.
#pragma once
+#include "format/generic_reader.h"
+#include "load/group_commit/wal/wal_file_reader.h"
+#include "runtime/descriptors.h"
-#include <gen_cpp/internal_service.pb.h>
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+struct ScannerCounter;
+class WalReader : public GenericReader {
+ ENABLE_FACTORY_CREATOR(WalReader);
-#include "common/status.h"
-#include "io/fs/file_reader_writer_fwd.h"
-
-namespace doris {
-
-class WalReader {
public:
- explicit WalReader(const std::string& file_name);
- ~WalReader();
-
- Status init();
- Status finalize();
-
- Status read_block(PBlock& block);
- Status read_header(uint32_t& version, std::string& col_ids);
+ WalReader(RuntimeState* state);
+ ~WalReader() override = default;
+ Status init_reader(const TupleDescriptor* tuple_descriptor);
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+ Status get_columns(std::unordered_map<std::string, DataTypePtr>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+ Status close() override {
+ if (_wal_reader) {
+ return _wal_reader->finalize();
+ }
+ return Status::OK();
+ }
private:
- Status _deserialize(PBlock& block, const std::string& buf, size_t
block_len, size_t bytes_read);
- Status _check_checksum(const char* binary, size_t size, uint32_t checksum);
-
- std::string _file_name;
- size_t _offset;
- io::FileReaderSPtr file_reader;
+ RuntimeState* _state = nullptr;
+ int64_t _wal_id;
+ std::string _wal_path;
+ std::shared_ptr<doris::WalFileReader> _wal_reader = nullptr;
+ const TupleDescriptor* _tuple_descriptor = nullptr;
+ // column_id, column_pos
+ std::map<int64_t, int64_t> _column_pos_map;
+ int64_t _column_id_count;
+ uint32_t _version = 0;
};
-
-} // namespace doris
\ No newline at end of file
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/load/group_commit/wal/wal_table.cpp
b/be/src/load/group_commit/wal/wal_table.cpp
index 8d99f8ca509..4c6c0e1edb5 100644
--- a/be/src/load/group_commit/wal/wal_table.cpp
+++ b/be/src/load/group_commit/wal/wal_table.cpp
@@ -334,7 +334,8 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t
tb_id,
}
Status WalTable::_read_wal_header(const std::string& wal_path, std::string&
columns) {
- std::shared_ptr<doris::WalReader> wal_reader =
std::make_shared<WalReader>(wal_path);
+ std::shared_ptr<doris::WalFileReader> wal_reader =
+ std::make_shared<doris::WalFileReader>(wal_path);
RETURN_IF_ERROR(wal_reader->init());
uint32_t version = 0;
RETURN_IF_ERROR(wal_reader->read_header(version, columns));
diff --git a/be/test/format/wal/wal_reader_writer_test.cpp
b/be/test/format/wal/wal_reader_writer_test.cpp
index 4cad033f027..d18c5ec97e0 100644
--- a/be/test/format/wal/wal_reader_writer_test.cpp
+++ b/be/test/format/wal/wal_reader_writer_test.cpp
@@ -27,7 +27,7 @@
#include "exec/exchange/vdata_stream_recvr.h"
#include "gmock/gmock.h"
#include "io/fs/local_file_system.h"
-#include "load/group_commit/wal/wal_reader.h"
+#include "load/group_commit/wal/wal_file_reader.h"
#include "load/group_commit/wal/wal_writer.h"
#include "runtime/exec_env.h"
#include "service/brpc.h"
@@ -121,7 +121,7 @@ TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
}
static_cast<void>(wal_writer.finalize());
// read block
- auto wal_reader = WalReader(file_name);
+ auto wal_reader = WalFileReader(file_name);
static_cast<void>(wal_reader.init());
auto block_count = 0;
while (true) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]