This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new dae05ba674c Framework to do delete filtering (#63442)
dae05ba674c is described below
commit dae05ba674cac4c737e25521ef5b46551d855539
Author: Gabriel <[email protected]>
AuthorDate: Wed May 20 16:25:47 2026 +0800
Framework to do delete filtering (#63442)
---
be/src/format/csv/csv_reader.cpp | 2 +-
be/src/format/json/new_json_reader.cpp | 2 +-
be/src/format/native/native_reader.cpp | 2 +-
be/src/format/orc/vorc_reader.cpp | 2 +-
be/src/format/parquet/vparquet_reader.cpp | 2 +-
be/src/format/reader/expr/delete_predicate.cpp | 77 ++++++++++++++++++
be/src/format/reader/expr/delete_predicate.h | 57 ++++++++++++++
be/src/format/reader/file_reader.h | 2 +
be/src/format/reader/table/paimon_reader.cpp | 42 ++++++++++
be/src/format/reader/table/paimon_reader.h | 36 +++++++++
be/src/format/reader/table_reader.cpp | 103 +++++++++++++++++++++++++
be/src/format/reader/table_reader.h | 39 ++++++++--
be/src/format/table/deletion_vector_reader.cpp | 19 ++---
be/src/format/table/deletion_vector_reader.h | 29 ++++++-
be/src/io/file_factory.cpp | 13 ++--
be/src/io/file_factory.h | 3 +-
gensrc/thrift/Exprs.thrift | 2 +
gensrc/thrift/Opcodes.thrift | 2 +
18 files changed, 401 insertions(+), 33 deletions(-)
diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp
index 539132c7c9f..4231b8eb20c 100644
--- a/be/src/format/csv/csv_reader.cpp
+++ b/be/src/format/csv/csv_reader.cpp
@@ -638,7 +638,7 @@ Status CsvReader::_create_file_reader(bool need_schema) {
} else {
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
- FileFactory::get_reader_options(_state, _file_description);
+ FileFactory::get_reader_options(_state->query_options(),
_file_description);
io::FileReaderSPtr file_reader;
if (_io_ctx_holder) {
file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
diff --git a/be/src/format/json/new_json_reader.cpp
b/be/src/format/json/new_json_reader.cpp
index da141437fcf..89992105cb8 100644
--- a/be/src/format/json/new_json_reader.cpp
+++ b/be/src/format/json/new_json_reader.cpp
@@ -478,7 +478,7 @@ Status NewJsonReader::_open_file_reader(bool need_schema) {
} else {
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
- FileFactory::get_reader_options(_state, _file_description);
+ FileFactory::get_reader_options(_state->query_options(),
_file_description);
io::FileReaderSPtr file_reader;
if (_io_ctx_holder) {
file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
diff --git a/be/src/format/native/native_reader.cpp
b/be/src/format/native/native_reader.cpp
index 565bab20231..32fb7d660ad 100644
--- a/be/src/format/native/native_reader.cpp
+++ b/be/src/format/native/native_reader.cpp
@@ -125,7 +125,7 @@ Status NativeReader::init_reader() {
}
io::FileReaderOptions reader_options =
- FileFactory::get_reader_options(_state, file_description);
+ FileFactory::get_reader_options(_state->query_options(),
file_description);
auto reader_res = io::DelegateReader::create_file_reader(
_profile, system_properties, file_description, reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx);
diff --git a/be/src/format/orc/vorc_reader.cpp
b/be/src/format/orc/vorc_reader.cpp
index bcb1a8d70f4..25db29c4962 100644
--- a/be/src/format/orc/vorc_reader.cpp
+++ b/be/src/format/orc/vorc_reader.cpp
@@ -348,7 +348,7 @@ Status OrcReader::_create_file_reader() {
_file_description.mtime =
_scan_range.__isset.modification_time ?
_scan_range.modification_time : 0;
io::FileReaderOptions reader_options =
- FileFactory::get_reader_options(_state, _file_description);
+ FileFactory::get_reader_options(_state->query_options(),
_file_description);
io::FileReaderSPtr inner_reader;
if (_io_ctx_holder != nullptr) {
inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
diff --git a/be/src/format/parquet/vparquet_reader.cpp
b/be/src/format/parquet/vparquet_reader.cpp
index a2f2356085b..35cb3b1944a 100644
--- a/be/src/format/parquet/vparquet_reader.cpp
+++ b/be/src/format/parquet/vparquet_reader.cpp
@@ -311,7 +311,7 @@ Status ParquetReader::_open_file() {
_file_description.mtime =
_scan_range.__isset.modification_time ?
_scan_range.modification_time : 0;
io::FileReaderOptions reader_options =
- FileFactory::get_reader_options(_state, _file_description);
+ FileFactory::get_reader_options(_state->query_options(),
_file_description);
_file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description,
reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
diff --git a/be/src/format/reader/expr/delete_predicate.cpp
b/be/src/format/reader/expr/delete_predicate.cpp
new file mode 100644
index 00000000000..8a4ac54102f
--- /dev/null
+++ b/be/src/format/reader/expr/delete_predicate.cpp
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/expr/delete_predicate.h"
+
+#include <fmt/format.h>
+#include <gen_cpp/Exprs_types.h>
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <cstddef>
+#include <ostream>
+
+#include "common/status.h"
+#include "core/block/block.h"
+#include "core/block/column_numbers.h"
+#include "core/block/column_with_type_and_name.h"
+#include "core/block/columns_with_type_and_name.h"
+
+namespace doris {
+
+DeletePredicate::DeletePredicate(const std::vector<int64_t>& deleted_rows)
+ : VExpr(), _deleted_rows(deleted_rows) {
+ _node_type = TExprNodeType::PREDICATE;
+ _opcode = TExprOpcode::DELETE;
+ _data_type = std::make_shared<DataTypeBool>();
+}
+
+Status DeletePredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
+ VExprContext* context) {
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+ _expr_name = "DeletePredicate";
+ _prepare_finished = true;
+ return Status::OK();
+}
+
+Status DeletePredicate::open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::open(state, context, scope));
+ _open_finished = true;
+ return Status::OK();
+}
+
+void DeletePredicate::close(VExprContext* context,
FunctionContext::FunctionStateScope scope) {
+ VExpr::close(context, scope);
+}
+
+Status DeletePredicate::execute_column_impl(VExprContext* context, const
Block* block,
+ const Selector* selector, size_t
count,
+ ColumnPtr& result_column) const {
+ DCHECK(_open_finished || block == nullptr);
+
+ static_cast<void>(_deleted_rows.size());
+ // TODO: implement delete predicate logic here, currently we just return a
column with all 0 (false)
+ return Status::OK();
+}
+
+std::string DeletePredicate::debug_string() const {
+ return _expr_name;
+}
+
+} // namespace doris
diff --git a/be/src/format/reader/expr/delete_predicate.h
b/be/src/format/reader/expr/delete_predicate.h
new file mode 100644
index 00000000000..feb8093ea5c
--- /dev/null
+++ b/be/src/format/reader/expr/delete_predicate.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/object_pool.h"
+#include "common/status.h"
+#include "exprs/function_context.h"
+#include "exprs/vexpr.h"
+
+namespace doris {
+class RowDescriptor;
+class RuntimeState;
+class TExprNode;
+class Block;
+class VExprContext;
+} // namespace doris
+
+namespace doris {
+
+class DeletePredicate final : public VExpr {
+ ENABLE_FACTORY_CREATOR(DeletePredicate);
+
+public:
+ DeletePredicate(const std::vector<int64_t>& deleted_rows);
+ ~DeletePredicate() override = default;
+ Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
+ size_t count, ColumnPtr& result_column) const
override;
+ Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override;
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+ void close(VExprContext* context, FunctionContext::FunctionStateScope
scope) override;
+ std::string debug_string() const override;
+ uint64_t get_digest(uint64_t seed) const override { return 0; }
+ const std::string& expr_name() const override { return _expr_name; }
+
+private:
+ std::string _expr_name;
+ const std::vector<int64_t>& _deleted_rows;
+};
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/format/reader/file_reader.h
b/be/src/format/reader/file_reader.h
index edebdcaff42..6dfbb4a8420 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -62,6 +62,8 @@ struct FileLocalFilter {
// 表达式过滤。适合 cast、复杂表达式或 reader_expression_map 生成的临时列过滤。
// 它通常不能直接驱动 row group stats、page index、dictionary、bloom filter。
VExprContextSPtr conjunct;
+ // DeletePredicate
+ VExprContextSPtr delete_conjunct;
// 结构化列谓词。适合文件层 pruning,例如 min/max、page index、dictionary、
// bloom filter 等只理解单列谓词的优化。
diff --git a/be/src/format/reader/table/paimon_reader.cpp
b/be/src/format/reader/table/paimon_reader.cpp
new file mode 100644
index 00000000000..713d1a97e68
--- /dev/null
+++ b/be/src/format/reader/table/paimon_reader.cpp
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/table/paimon_reader.h"
+
+#include "format/table/deletion_vector_reader.h"
+
+namespace doris::paimon {
+
+bool PaimonReader::_parse_delete_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc& desc) {
+ const auto& table_desc = t_desc.paimon_params;
+ if (!table_desc.__isset.deletion_file) {
+ return false;
+ }
+ const auto& deletion_file = table_desc.deletion_file;
+
+ desc.key.resize(deletion_file.path.size() + sizeof(deletion_file.offset));
+ memcpy(desc.key.data(), deletion_file.path.data(),
deletion_file.path.size());
+ memcpy(desc.key.data() + deletion_file.path.size(), &deletion_file.offset,
+ sizeof(deletion_file.offset));
+ desc.path = deletion_file.path;
+ desc.start_offset = deletion_file.offset;
+ desc.size = deletion_file.length + 4;
+ desc.file_size = -1;
+ return true;
+}
+
+} // namespace doris::paimon
diff --git a/be/src/format/reader/table/paimon_reader.h
b/be/src/format/reader/table/paimon_reader.h
new file mode 100644
index 00000000000..d0f33c7a90c
--- /dev/null
+++ b/be/src/format/reader/table/paimon_reader.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "format/reader/table_reader.h"
+
+namespace doris {
+struct DeleteFileDesc;
+}
+namespace doris::paimon {
+
+class PaimonReader final : public reader::TableReader {
+public:
+ ENABLE_FACTORY_CREATOR(PaimonReader);
+ ~PaimonReader() final = default;
+
+protected:
+ bool _parse_delete_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc& desc) override;
+};
+
+} // namespace doris::paimon
diff --git a/be/src/format/reader/table_reader.cpp
b/be/src/format/reader/table_reader.cpp
new file mode 100644
index 00000000000..b89641c0bd2
--- /dev/null
+++ b/be/src/format/reader/table_reader.cpp
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/table_reader.h"
+
+#include <vector>
+
+#include "common/status.h"
+#include "format/reader/column_mapper.h"
+#include "format/table/deletion_vector_reader.h"
+
+namespace doris::reader {
+
+Status TableReader::prepare_split(const SplitReadOptions& options) {
+ _partition_values = std::move(options.partition_values);
+ return _parse_delete_predicates(options);
+}
+
+Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) {
+ DeleteFileDesc desc {.fs_name = options.current_range.fs_name};
+ if (_parse_delete_file(options.current_range.table_format_params, desc)) {
+ Status create_status = Status::OK();
+
+ _delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() ->
DeleteRows* {
+ auto* delete_rows = new DeleteRows;
+
+ DeletionVectorReader dv_reader(_runtime_state, _scanner_profile,
*_scan_params, desc,
+ _io_ctx);
+ create_status = dv_reader.open();
+ if (!create_status.ok()) [[unlikely]] {
+ return nullptr;
+ }
+
+ size_t bytes_read = desc.size;
+ std::vector<char> buffer(bytes_read);
+ create_status = dv_reader.read_at(desc.start_offset,
{buffer.data(), bytes_read});
+ if (!create_status.ok()) [[unlikely]] {
+ return nullptr;
+ }
+
+ const char* buf = buffer.data();
+ uint32_t actual_length;
+ std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
+ std::reverse(reinterpret_cast<char*>(&actual_length),
+ reinterpret_cast<char*>(&actual_length) + 4);
+ buf += 4;
+ if (actual_length != bytes_read - 4) [[unlikely]] {
+ create_status = Status::RuntimeError(
+ "DeletionVector deserialize error: length not match, "
+ "actual length: {}, expect length: {}",
+ actual_length, bytes_read - 4);
+ return nullptr;
+ }
+ uint32_t magic_number;
+ std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
+ std::reverse(reinterpret_cast<char*>(&magic_number),
+ reinterpret_cast<char*>(&magic_number) + 4);
+ buf += 4;
+ const static uint32_t MAGIC_NUMBER = 1581511376;
+ if (magic_number != MAGIC_NUMBER) [[unlikely]] {
+ create_status = Status::RuntimeError(
+ "DeletionVector deserialize error: invalid magic
number {}", magic_number);
+ return nullptr;
+ }
+
+ roaring::Roaring roaring_bitmap;
+ SCOPED_TIMER(_profile->parse_delete_file_time);
+ try {
+ roaring_bitmap = roaring::Roaring::readSafe(buf, bytes_read -
4);
+ } catch (const std::runtime_error& e) {
+ create_status = Status::RuntimeError(
+ "DeletionVector deserialize error: failed to
deserialize roaring bitmap, "
+ "{}",
+ e.what());
+ return nullptr;
+ }
+ delete_rows->reserve(roaring_bitmap.cardinality());
+ for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end();
it++) {
+ delete_rows->push_back(*it);
+ }
+ COUNTER_UPDATE(_profile->num_delete_rows, delete_rows->size());
+ return delete_rows;
+ });
+ RETURN_IF_ERROR(create_status);
+ }
+
+ return Status::OK();
+}
+} // namespace doris::reader
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index 4d8fe0620c8..d14e1e78261 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -33,16 +33,20 @@
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
#include "format/reader/column_mapper.h"
+#include "format/reader/expr/delete_predicate.h"
#include "format/reader/expr/literal.h"
#include "format/reader/file_reader.h"
namespace doris {
class Block;
class ColumnPredicate;
+struct DeleteFileDesc;
} // namespace doris
namespace doris::reader {
+using DeleteRows = std::vector<int64_t>;
+
// table/global schema 中的列视图。
// Iceberg 场景下,id 默认对应 Iceberg field id。该结构不描述文件中的物理列。
struct TableColumn {
@@ -90,13 +94,21 @@ struct ScanTask {
std::unique_ptr<BaseDataFile> data_file;
};
-struct ReadProfile {};
+struct ReadProfile {
+ RuntimeProfile::Counter* num_delete_files;
+ RuntimeProfile::Counter* num_delete_rows;
+ RuntimeProfile::Counter* parse_delete_file_time;
+};
struct TableReadOptions {
const std::vector<TableColumn> projected_columns;
// All conjuncts from scan operator
const VExprContext conjuncts;
const FileFormat format;
+ TFileScanRangeParams* scan_params;
+ io::IOContext* io_ctx;
+ RuntimeState* runtime_state;
+ RuntimeProfile* scanner_profile;
// Each task denotes a descriptor of a single file to read, along with
file-level metadata such as stats and delete files.
std::vector<std::unique_ptr<ScanTask>> scan_tasks;
@@ -105,6 +117,8 @@ struct TableReadOptions {
struct SplitReadOptions {
std::map<std::string, Field> partition_values;
+ ShardedKVCache* cache;
+ TFileRangeDesc current_range;
};
// table-level reader 基类。
@@ -117,6 +131,11 @@ public:
// 初始化 table reader 的通用运行参数。
// 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
virtual Status init(TableReadOptions options) {
+ _scan_params = options.scan_params;
+ _format = options.format;
+ _io_ctx = options.io_ctx;
+ _runtime_state = options.runtime_state;
+ _scanner_profile = options.scanner_profile;
_scan_tasks = std::move(_options.scan_tasks);
_next_task_idx = 0;
_profile = std::move(options.profile);
@@ -129,10 +148,7 @@ public:
}
// 读取当前 split/partition 之前初始化。
- virtual Status prepare_split(SplitReadOptions options) {
- _partition_values = std::move(options.partition_values);
- return Status::OK();
- }
+ virtual Status prepare_split(const SplitReadOptions& options);
// table-level 动态过滤入口。
// 该方法用于根据 split、partition value 或文件级统计判断是否可以跳过后续 reader。
@@ -193,6 +209,9 @@ public:
}
protected:
+ virtual bool _parse_delete_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc& desc) {
+ return false;
+ }
// 切换到下一个 reader 的通用流程。
// 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
Status create_next_reader(bool* eos) {
@@ -262,6 +281,16 @@ protected:
size_t _next_task_idx = 0;
std::map<int32_t, TableFilter> _table_filters;
std::unique_ptr<ReadProfile> _profile;
+ // Parsed from DELETION_VECTOR in Iceberg and Paimon
+ DeleteRows* _delete_rows;
+ TFileScanRangeParams* _scan_params;
+ io::IOContext* _io_ctx;
+ RuntimeState* _runtime_state;
+ RuntimeProfile* _scanner_profile;
+ FileFormat _format;
+
+private:
+ Status _parse_delete_predicates(const SplitReadOptions& options);
};
} // namespace doris::reader
diff --git a/be/src/format/table/deletion_vector_reader.cpp
b/be/src/format/table/deletion_vector_reader.cpp
index bfe34a5f555..d7e33c923d9 100644
--- a/be/src/format/table/deletion_vector_reader.cpp
+++ b/be/src/format/table/deletion_vector_reader.cpp
@@ -54,9 +54,9 @@ Status DeletionVectorReader::_create_file_reader() {
return Status::EndOfFile("stop read.");
}
- _file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
+ _file_description.mtime = _desc.modification_time;
io::FileReaderOptions reader_options =
- FileFactory::get_reader_options(_state, _file_description);
+ FileFactory::get_reader_options(_state->query_options(),
_file_description);
_file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
@@ -64,20 +64,13 @@ Status DeletionVectorReader::_create_file_reader() {
}
void DeletionVectorReader::_init_file_description() {
- _file_description.path = _range.path;
- _file_description.file_size = _range.__isset.file_size ? _range.file_size
: -1;
- if (_range.__isset.fs_name) {
- _file_description.fs_name = _range.fs_name;
- }
+ _file_description.path = _desc.path;
+ _file_description.file_size = _desc.file_size;
+ _file_description.fs_name = _desc.fs_name;
}
void DeletionVectorReader::_init_system_properties() {
- if (_range.__isset.file_type) {
- // for compatibility
- _system_properties.system_type = _range.file_type;
- } else {
- _system_properties.system_type = _params.file_type;
- }
+ _system_properties.system_type = _params.file_type;
_system_properties.properties = _params.properties;
_system_properties.hdfs_params = _params.hdfs_params;
if (_params.__isset.broker_addresses) {
diff --git a/be/src/format/table/deletion_vector_reader.h
b/be/src/format/table/deletion_vector_reader.h
index 0663f3b2849..b030f048415 100644
--- a/be/src/format/table/deletion_vector_reader.h
+++ b/be/src/format/table/deletion_vector_reader.h
@@ -36,6 +36,16 @@ struct IOContext;
} // namespace io
namespace doris {
+struct DeleteFileDesc {
+ std::string key = "";
+ std::string path = "";
+ std::string fs_name = "";
+ int64_t start_offset = 0;
+ int64_t size = 0;
+ int64_t file_size = -1;
+ int64_t modification_time = 0;
+};
+
class DeletionVectorReader {
ENABLE_FACTORY_CREATOR(DeletionVectorReader);
@@ -43,7 +53,22 @@ public:
DeletionVectorReader(RuntimeState* state, RuntimeProfile* profile,
const TFileScanRangeParams& params, const
TFileRangeDesc& range,
io::IOContext* io_ctx)
- : _state(state), _profile(profile), _range(range),
_params(params), _io_ctx(io_ctx) {}
+ : _state(state), _profile(profile), _params(params),
_io_ctx(io_ctx) {
+ _desc = DeleteFileDesc {
+ .key = "",
+ .path = range.path,
+ .fs_name = range.__isset.fs_name ? range.fs_name : "",
+ .start_offset = range.start_offset,
+ .size = range.size,
+ .file_size = range.__isset.file_size ? range.file_size : -1,
+ .modification_time = range.__isset.modification_time ?
range.modification_time : 0};
+ }
+ DeletionVectorReader(RuntimeState* state, RuntimeProfile* profile,
+ const TFileScanRangeParams& params, const
DeleteFileDesc& desc,
+ io::IOContext* io_ctx)
+ : _state(state), _profile(profile), _params(params),
_io_ctx(io_ctx) {
+ _desc = desc;
+ }
~DeletionVectorReader() = default;
Status open();
Status read_at(size_t offset, Slice result);
@@ -56,7 +81,7 @@ private:
private:
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
- const TFileRangeDesc& _range;
+ DeleteFileDesc _desc;
const TFileScanRangeParams& _params;
io::IOContext* _io_ctx = nullptr;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 553cdc4460e..9610bc02859 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -57,21 +57,20 @@ namespace doris {
constexpr std::string_view RANDOM_CACHE_BASE_PATH = "random";
-io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state,
+io::FileReaderOptions FileFactory::get_reader_options(const TQueryOptions&
option,
const
io::FileDescription& fd) {
io::FileReaderOptions opts {
.cache_base_path {},
.file_size = fd.file_size,
.mtime = fd.mtime,
};
- if (config::enable_file_cache && state != nullptr &&
- state->query_options().__isset.enable_file_cache &&
- state->query_options().enable_file_cache && fd.file_cache_admission) {
+ if (config::enable_file_cache && option.__isset.enable_file_cache &&
option.enable_file_cache &&
+ fd.file_cache_admission) {
opts.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE;
}
- if (state != nullptr &&
state->query_options().__isset.file_cache_base_path &&
- state->query_options().file_cache_base_path != RANDOM_CACHE_BASE_PATH)
{
- opts.cache_base_path = state->query_options().file_cache_base_path;
+ if (option.__isset.file_cache_base_path &&
+ option.file_cache_base_path != RANDOM_CACHE_BASE_PATH) {
+ opts.cache_base_path = option.file_cache_base_path;
}
return opts;
}
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 7d662e4fdde..a32c8077c48 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
@@ -83,7 +84,7 @@ class FileFactory {
ENABLE_FACTORY_CREATOR(FileFactory);
public:
- static io::FileReaderOptions get_reader_options(RuntimeState* state,
+ static io::FileReaderOptions get_reader_options(const TQueryOptions&
option,
const io::FileDescription&
fd);
/// Create a temporary FileSystem for accessing file corresponding to
`file_description`
diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift
index 2644ecec417..967499aac69 100644
--- a/gensrc/thrift/Exprs.thrift
+++ b/gensrc/thrift/Exprs.thrift
@@ -88,6 +88,8 @@ enum TExprNodeType {
TRY_CAST_EXPR = 41
// for search DSL function
SEARCH_EXPR = 42,
+ // Normal predicate expression
+ PREDICATE = 43,
}
//enum TAggregationOp {
diff --git a/gensrc/thrift/Opcodes.thrift b/gensrc/thrift/Opcodes.thrift
index 1e4002357e7..a2d70979948 100644
--- a/gensrc/thrift/Opcodes.thrift
+++ b/gensrc/thrift/Opcodes.thrift
@@ -97,4 +97,6 @@ enum TExprOpcode {
MATCH_REGEXP = 76,
MATCH_PHRASE_EDGE = 77,
TRY_CAST = 78,
+ // Delete operator from Iceberg/Paimon
+ DELETE = 79,
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]