This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push:
new 92125f3b855 add stats
92125f3b855 is described below
commit 92125f3b85589a87425da412c835b0233999e063
Author: eldenmoon <[email protected]>
AuthorDate: Tue Dec 10 12:14:31 2024 +0800
add stats
---
be/src/olap/rowset/segment_v2/segment_writer.cpp | 12 --
.../segment_v2/variant_column_writer_impl.cpp | 197 ++++++++++++++-------
.../rowset/segment_v2/variant_column_writer_impl.h | 35 +++-
.../rowset/segment_v2/vertical_segment_writer.cpp | 11 --
be/src/vec/columns/column_object.cpp | 170 +++++++-----------
be/src/vec/columns/column_object.h | 31 ++--
be/src/vec/common/schema_util.cpp | 23 ++-
be/src/vec/common/schema_util.h | 3 +
gensrc/proto/segment_v2.proto | 9 +-
9 files changed, 275 insertions(+), 216 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index f9b3928298b..1bfcfbb999b 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -422,18 +422,6 @@ Status
SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& da
_flush_schema->append_column(tablet_column);
_olap_data_convertor->clear_source_content();
}
- // sparse_columns
- for (const auto& entry :
vectorized::schema_util::get_sorted_subcolumns(
- object_column.get_sparse_subcolumns())) {
- TabletColumn sparse_tablet_column = generate_column_info(entry);
- _flush_schema->mutable_column_by_uid(parent_column->unique_id())
- .append_sparse_column(sparse_tablet_column);
-
- // add sparse column to footer
- auto* column_pb = _footer.mutable_columns(i);
- init_column_meta(column_pb->add_sparse_columns(), -1,
sparse_tablet_column,
- _flush_schema);
- }
}
// Update rowset schema, tablet's tablet schema will be updated when build
Rowset
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index 72884ab775b..958df5780bd 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -38,78 +38,61 @@ VariantColumnWriterImpl::VariantColumnWriterImpl(const
ColumnWriterOptions& opts
_tablet_column = column;
}
-Status VariantColumnWriterImpl::finalize() {
- auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
- ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE);
-
- // convert each subcolumns to storage format and add data to sub columns
writers buffer
- auto olap_data_convertor =
std::make_unique<vectorized::OlapBlockDataConvertor>();
-
- DCHECK(ptr->is_finalized());
-
- if (ptr->is_null_root()) {
- auto root_type = vectorized::make_nullable(
- std::make_shared<vectorized::ColumnObject::MostCommonType>());
- auto root_col = root_type->create_column();
- root_col->insert_many_defaults(ptr->rows());
- ptr->create_root(root_type, std::move(root_col));
- }
-
- // common extracted columns
- const auto& parent_column = *_tablet_column;
-
- // generate column info by entry info
- auto generate_column_info = [&](const auto& entry) {
- const std::string& column_name =
- parent_column.name_lower_case() + "." + entry->path.get_path();
- const vectorized::DataTypePtr& final_data_type_from_object =
- entry->data.get_least_common_type();
- vectorized::PathInDataBuilder full_path_builder;
- auto full_path =
full_path_builder.append(parent_column.name_lower_case(), false)
- .append(entry->path.get_parts(), false)
- .build();
- // set unique_id and parent_unique_id, will use unique_id to get
iterator correct
- return vectorized::schema_util::get_column_by_type(
- final_data_type_from_object, column_name,
- vectorized::schema_util::ExtraInfo {.unique_id =
parent_column.unique_id(),
- .parent_unique_id =
parent_column.unique_id(),
- .path_info = full_path});
- };
+Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject*
ptr,
+
vectorized::OlapBlockDataConvertor* converter,
+ size_t num_rows, int&
column_id) {
// root column
ColumnWriterOptions root_opts = _opts;
_root_writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter(
- _opts,
std::unique_ptr<Field>(FieldFactory::create(parent_column)),
_opts.file_writer));
+ _opts,
std::unique_ptr<Field>(FieldFactory::create(*_tablet_column)),
+ _opts.file_writer));
RETURN_IF_ERROR(_root_writer->init());
- // subcolumn
- size_t num_rows = _column->size();
- for (auto& subcolumn : _subcolumn_writers) {
- RETURN_IF_ERROR(subcolumn->init());
- }
-
// make sure the root type
auto expected_root_type =
vectorized::make_nullable(std::make_shared<vectorized::ColumnObject::MostCommonType>());
ptr->ensure_root_node_type(expected_root_type);
- int column_id = 0;
- // convert root column data from engine format to storage layer format
- olap_data_convertor->add_column_data_convertor(parent_column);
-
RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column(
+ converter->add_column_data_convertor(*_tablet_column);
+ RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
{ptr->get_root()->get_ptr(), nullptr, ""}, 0, num_rows,
column_id));
- auto [status, column] =
olap_data_convertor->convert_column_data(column_id);
+ auto [status, column] = converter->convert_column_data(column_id);
if (!status.ok()) {
return status;
}
- // use real null data instead of root
const uint8_t* nullmap =
vectorized::check_and_get_column<vectorized::ColumnUInt8>(_null_column.get())
->get_data()
.data();
RETURN_IF_ERROR(_root_writer->append(nullmap, column->get_data(),
num_rows));
++column_id;
- olap_data_convertor->clear_source_content();
+ converter->clear_source_content();
+
+ _opts.meta->set_num_rows(num_rows);
+ return Status::OK();
+}
+Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject*
ptr,
+
vectorized::OlapBlockDataConvertor* converter,
+ size_t num_rows, int&
column_id) {
+ // generate column info by entry info
+ auto generate_column_info = [&](const auto& entry) {
+ const std::string& column_name =
+ _tablet_column->name_lower_case() + "." +
entry->path.get_path();
+ const vectorized::DataTypePtr& final_data_type_from_object =
+ entry->data.get_least_common_type();
+ vectorized::PathInDataBuilder full_path_builder;
+ auto full_path =
full_path_builder.append(_tablet_column->name_lower_case(), false)
+ .append(entry->path.get_parts(), false)
+ .build();
+ // set unique_id and parent_unique_id, will use unique_id to get
iterator correct
+ return vectorized::schema_util::get_column_by_type(
+ final_data_type_from_object, column_name,
+ vectorized::schema_util::ExtraInfo {.unique_id =
_tablet_column->unique_id(),
+ .parent_unique_id =
_tablet_column->unique_id(),
+ .path_info = full_path});
+ };
+
_statistics._subcolumns_non_null_size.reserve(ptr->get_subcolumns().size());
// convert sub column data from engine format to storage layer format
for (const auto& entry :
vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) {
@@ -120,22 +103,111 @@ Status VariantColumnWriterImpl::finalize() {
CHECK(entry->data.is_finalized());
int current_column_id = column_id++;
TabletColumn tablet_column = generate_column_info(entry);
- RETURN_IF_ERROR(_create_column_writer(current_column_id,
tablet_column, parent_column,
+ RETURN_IF_ERROR(_create_column_writer(current_column_id,
tablet_column, *_tablet_column,
_opts.rowset_ctx->tablet_schema));
- olap_data_convertor->add_column_data_convertor(tablet_column);
-
RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column(
+ converter->add_column_data_convertor(tablet_column);
+ RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
{entry->data.get_finalized_column_ptr()->get_ptr(),
entry->data.get_least_common_type(), tablet_column.name()},
0, num_rows, current_column_id));
- auto [status, column] =
olap_data_convertor->convert_column_data(current_column_id);
+ auto [status, column] =
converter->convert_column_data(current_column_id);
if (!status.ok()) {
return status;
}
const uint8_t* nullmap = column->get_nullmap();
RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append(
nullmap, column->get_data(), num_rows));
- olap_data_convertor->clear_source_content();
+ converter->clear_source_content();
+ _subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows);
+
+ // get stastics
+
_statistics._subcolumns_non_null_size.push_back(entry->data.get_non_null_value_size());
+ }
+ return Status::OK();
+}
+
+Status VariantColumnWriterImpl::_process_sparse_column(
+ vectorized::ColumnObject* ptr, vectorized::OlapBlockDataConvertor*
converter,
+ size_t num_rows, int& column_id) {
+ // create sparse column writer
+ TabletColumn sparse_column =
+
vectorized::schema_util::create_sparse_column(_tablet_column->unique_id());
+ ColumnWriterOptions sparse_writer_opts;
+ sparse_writer_opts.meta = _opts.footer->add_columns();
+
+ _init_column_meta(sparse_writer_opts.meta, column_id, sparse_column);
+ RETURN_IF_ERROR(ColumnWriter::create_map_writer(sparse_writer_opts,
&sparse_column,
+ _opts.file_writer,
&_sparse_column_writer));
+ RETURN_IF_ERROR(_sparse_column_writer->init());
+
+ // convert root column data from engine format to storage layer format
+ converter->add_column_data_convertor(sparse_column);
+ RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
+ {ptr->get_sparse_column()->get_ptr(), nullptr, ""}, 0, num_rows,
column_id));
+ auto [status, column] = converter->convert_column_data(column_id);
+ if (!status.ok()) {
+ return status;
+ }
+ RETURN_IF_ERROR(
+ _sparse_column_writer->append(column->get_nullmap(),
column->get_data(), num_rows));
+ ++column_id;
+ converter->clear_source_content();
+
+ // get stastics
+ // todo: reuse the statics from collected stastics from compaction stage
+ std::unordered_map<std::string, size_t> sparse_data_paths_statistics;
+ const auto [sparse_data_paths, _] =
ptr->get_sparse_data_paths_and_values();
+ for (size_t i = 0; i != sparse_data_paths->size(); ++i) {
+ auto path = sparse_data_paths->get_data_at(i);
+ if (auto it = _statistics._sparse_column_non_null_size.find(path);
+ it != _statistics._sparse_column_non_null_size.end()) {
+ ++it->second;
+ } else if (_statistics._sparse_column_non_null_size.size() <
+ VariantStatistics::MAX_SHARED_DATA_STATISTICS_SIZE) {
+ _statistics._sparse_column_non_null_size.emplace(path, 1);
+ }
+ }
+
+ sparse_writer_opts.meta->set_num_rows(num_rows);
+ return Status::OK();
+}
+
+void VariantStatistics::to_pb(VariantStatisticsPB* stats) const {
+ // TODO
+}
+
+Status VariantColumnWriterImpl::finalize() {
+ auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
+
RETURN_IF_ERROR(ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE));
+
+ // convert each subcolumns to storage format and add data to sub columns
writers buffer
+ auto olap_data_convertor =
std::make_unique<vectorized::OlapBlockDataConvertor>();
+
+ DCHECK(ptr->is_finalized());
+
+ if (ptr->is_null_root()) {
+ auto root_type = vectorized::make_nullable(
+ std::make_shared<vectorized::ColumnObject::MostCommonType>());
+ auto root_col = root_type->create_column();
+ root_col->insert_many_defaults(ptr->rows());
+ ptr->create_root(root_type, std::move(root_col));
}
+
+ size_t num_rows = _column->size();
+ int column_id = 0;
+
+ // convert root column data from engine format to storage layer format
+ RETURN_IF_ERROR(_process_root_column(ptr, olap_data_convertor.get(),
num_rows, column_id));
+
+ // process and append each subcolumns to sub columns writers buffer
+ RETURN_IF_ERROR(_process_subcolumns(ptr, olap_data_convertor.get(),
num_rows, column_id));
+
+ // process sparse column and append to sparse writer buffer
+ RETURN_IF_ERROR(_process_sparse_column(ptr, olap_data_convertor.get(),
num_rows, column_id));
+
+ // set statistics info
+ _statistics.to_pb(_opts.meta->mutable_variant_statistics());
+
_is_finalized = true;
return Status::OK();
}
@@ -164,6 +236,7 @@ uint64_t VariantColumnWriterImpl::estimate_buffer_size() {
size += column_writer->estimate_buffer_size();
}
size += _root_writer->estimate_buffer_size();
+ size += _sparse_column_writer->estimate_buffer_size();
return size;
}
@@ -172,14 +245,10 @@ Status VariantColumnWriterImpl::finish() {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->finish());
+ RETURN_IF_ERROR(_sparse_column_writer->finish());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->finish());
}
- _opts.meta->set_num_rows(_root_writer->get_next_rowid());
- for (auto& suboptions : _subcolumn_opts) {
- suboptions.meta->set_num_rows(_root_writer->get_next_rowid());
- }
- return Status::OK();
return Status::OK();
}
Status VariantColumnWriterImpl::write_data() {
@@ -187,6 +256,7 @@ Status VariantColumnWriterImpl::write_data() {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->write_data());
+ RETURN_IF_ERROR(_sparse_column_writer->write_data());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_data());
}
@@ -197,6 +267,7 @@ Status VariantColumnWriterImpl::write_ordinal_index() {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->write_ordinal_index());
+ RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_ordinal_index());
}
@@ -277,10 +348,6 @@ void
VariantColumnWriterImpl::_init_column_meta(ColumnMetaPB* meta, uint32_t col
for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
_init_column_meta(meta->add_children_columns(), column_id,
column.get_sub_column(i));
}
- // add sparse column to footer
- for (uint32_t i = 0; i < column.num_sparse_columns(); i++) {
- _init_column_meta(meta->add_sparse_columns(), -1,
column.sparse_column_at(i));
- }
};
Status VariantColumnWriterImpl::_create_column_writer(uint32_t cid, const
TabletColumn& column,
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
index 348dd1ab0cb..87f67e7b1ef 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
@@ -24,11 +24,25 @@
#include "olap/tablet_schema.h"
#include "vec/columns/column.h"
-namespace doris::segment_v2 {
+namespace doris {
+
+namespace vectorized {
+class ColumnObject;
+class OlapBlockDataConvertor;
+} // namespace vectorized
+namespace segment_v2 {
class ColumnWriter;
class ScalarColumnWriter;
+struct VariantStatistics {
+ constexpr static size_t MAX_SHARED_DATA_STATISTICS_SIZE = 10000;
+ std::vector<size_t> _subcolumns_non_null_size;
+ std::map<StringRef, size_t> _sparse_column_non_null_size;
+
+ void to_pb(VariantStatisticsPB* stats) const;
+};
+
class VariantColumnWriterImpl {
public:
VariantColumnWriterImpl(const ColumnWriterOptions& opts, const
TabletColumn* column);
@@ -54,15 +68,30 @@ private:
Status _create_column_writer(uint32_t cid, const TabletColumn& column,
const TabletColumn& parent_column,
const TabletSchemaSPtr& tablet_schema);
+ Status _process_root_column(vectorized::ColumnObject* ptr,
+ vectorized::OlapBlockDataConvertor* converter,
size_t num_rows,
+ int& column_id);
+ Status _process_sparse_column(vectorized::ColumnObject* ptr,
+ vectorized::OlapBlockDataConvertor*
converter, size_t num_rows,
+ int& column_id);
+ Status _process_subcolumns(vectorized::ColumnObject* ptr,
+ vectorized::OlapBlockDataConvertor* converter,
size_t num_rows,
+ int& column_id);
// prepare a column for finalize
doris::vectorized::MutableColumnPtr _column;
doris::vectorized::MutableColumnPtr _null_column;
ColumnWriterOptions _opts;
const TabletColumn* _tablet_column = nullptr;
bool _is_finalized = false;
- // for sparse column and root column
+ // for root column
std::unique_ptr<ColumnWriter> _root_writer;
+ // for sparse column
+ std::unique_ptr<ColumnWriter> _sparse_column_writer;
std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers;
std::vector<ColumnWriterOptions> _subcolumn_opts;
+
+ // staticstics which will be persisted in the footer
+ VariantStatistics _statistics;
};
-} // namespace doris::segment_v2
\ No newline at end of file
+} // namespace segment_v2
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5c72cd6384a..089dac218fe 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -1054,17 +1054,6 @@ Status
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
_flush_schema->append_column(tablet_column);
_olap_data_convertor->clear_source_content();
}
- // sparse_columns
- for (const auto& entry :
vectorized::schema_util::get_sorted_subcolumns(
- object_column.get_sparse_subcolumns())) {
- TabletColumn sparse_tablet_column = generate_column_info(entry);
- _flush_schema->mutable_column_by_uid(parent_column->unique_id())
- .append_sparse_column(sparse_tablet_column);
-
- // add sparse column to footer
- auto* column_pb = _footer.mutable_columns(i);
- _init_column_meta(column_pb->add_sparse_columns(), -1,
sparse_tablet_column);
- }
}
// Update rowset schema, tablet's tablet schema will be updated when build
Rowset
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index 0aef86e30e2..2983d799166 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -36,6 +36,7 @@
#include <memory>
#include <optional>
#include <sstream>
+#include <unordered_map>
#include <vector>
#include "common/compiler_util.h" // IWYU pragma: keep
@@ -1091,7 +1092,7 @@ void ColumnObject::insert_range_from(const IColumn& src,
size_t start, size_t le
}
}
num_rows += length;
- finalize(FinalizeMode::READ_MODE);
+ finalize();
#ifndef NDEBUG
check_consistency();
#endif
@@ -1419,7 +1420,7 @@ void get_json_by_column_tree(rapidjson::Value& root,
rapidjson::Document::Alloca
Status ColumnObject::serialize_one_row_to_string(int64_t row, std::string*
output) const {
if (!is_finalized()) {
- const_cast<ColumnObject*>(this)->finalize(FinalizeMode::READ_MODE);
+ const_cast<ColumnObject*>(this)->finalize();
}
rapidjson::StringBuffer buf;
if (is_scalar_variant()) {
@@ -1435,7 +1436,7 @@ Status ColumnObject::serialize_one_row_to_string(int64_t
row, std::string* outpu
Status ColumnObject::serialize_one_row_to_string(int64_t row, BufferWritable&
output) const {
if (!is_finalized()) {
- const_cast<ColumnObject*>(this)->finalize(FinalizeMode::READ_MODE);
+ const_cast<ColumnObject*>(this)->finalize();
}
if (is_scalar_variant()) {
auto type = get_root_type();
@@ -1504,99 +1505,13 @@ Status
ColumnObject::serialize_one_row_to_json_format(int64_t row, rapidjson::St
return Status::OK();
}
-Status ColumnObject::merge_sparse_to_root_column() {
- CHECK(is_finalized());
- if (sparse_columns.empty()) {
- return Status::OK();
+size_t ColumnObject::Subcolumn::get_non_null_value_size() const {
+ size_t res = 0;
+ for (const auto& part : data) {
+ const auto& null_data = assert_cast<const
ColumnNullable&>(*part).get_null_map_data();
+ res += simd::count_zero_num((int8_t*)null_data.data(),
null_data.size());
}
- ColumnPtr src =
subcolumns.get_mutable_root()->data.get_finalized_column_ptr();
- MutableColumnPtr mresult = src->clone_empty();
- const ColumnNullable* src_null = assert_cast<const
ColumnNullable*>(src.get());
- const ColumnString* src_column_ptr =
- assert_cast<const ColumnString*>(&src_null->get_nested_column());
- rapidjson::StringBuffer buffer;
- doc_structure = std::make_shared<rapidjson::Document>();
- rapidjson::Document::AllocatorType& allocator =
doc_structure->GetAllocator();
- get_json_by_column_tree(*doc_structure, allocator,
sparse_columns.get_root());
-
-#ifndef NDEBUG
- VLOG_DEBUG << "dump structure " <<
JsonFunctions::print_json_value(*doc_structure);
-#endif
-
- ColumnNullable* result_column_nullable =
- assert_cast<ColumnNullable*>(mresult->assume_mutable().get());
- ColumnString* result_column_ptr =
-
assert_cast<ColumnString*>(&result_column_nullable->get_nested_column());
- result_column_nullable->reserve(num_rows);
- // parse each row to jsonb
- for (size_t i = 0; i < num_rows; ++i) {
- // root is not null, store original value, eg. the root is scalar type
like '[1]'
- if (!src_null->empty() && !src_null->is_null_at(i)) {
- result_column_ptr->insert_data(src_column_ptr->get_data_at(i).data,
-
src_column_ptr->get_data_at(i).size);
- result_column_nullable->get_null_map_data().push_back(0);
- continue;
- }
-
- // parse and encode sparse columns
- buffer.Clear();
- rapidjson::Value root(rapidjson::kNullType);
- if (!doc_structure->IsNull()) {
- root.CopyFrom(*doc_structure, doc_structure->GetAllocator());
- }
- size_t null_count = 0;
- Arena mem_pool;
- for (const auto& subcolumn : sparse_columns) {
- auto& column = subcolumn->data.get_finalized_column_ptr();
- if (assert_cast<const ColumnNullable&,
TypeCheckOnRelease::DISABLE>(*column).is_null_at(
- i)) {
- ++null_count;
- continue;
- }
- bool succ = find_and_set_leave_value(
- column, subcolumn->path,
subcolumn->data.get_least_common_type_serde(),
- subcolumn->data.get_least_common_type(),
- subcolumn->data.least_common_type.get_base_type_id(), root,
- doc_structure->GetAllocator(), mem_pool, i);
- if (succ && subcolumn->path.empty() && !root.IsObject()) {
- // root was modified, only handle root node
- break;
- }
- }
-
- // all null values, store null to sparse root
- if (null_count == sparse_columns.size()) {
- result_column_ptr->insert_default();
- result_column_nullable->get_null_map_data().push_back(1);
- continue;
- }
-
- // encode sparse columns into jsonb format
- compact_null_values(root, doc_structure->GetAllocator());
- // parse as jsonb value and put back to rootnode
- // TODO, we could convert to jsonb directly from rapidjson::Value for
better performance, instead of parsing
- JsonbParser parser;
- rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
- root.Accept(writer);
- bool res = parser.parse(buffer.GetString(), buffer.GetSize());
- if (!res) {
- return Status::InvalidArgument(
- "parse json failed, doc: {}"
- ", row_num:{}"
- ", error:{}",
- std::string(buffer.GetString(), buffer.GetSize()), i,
- JsonbErrMsg::getErrMsg(parser.getErrorCode()));
- }
-
result_column_ptr->insert_data(parser.getWriter().getOutput()->getBuffer(),
-
parser.getWriter().getOutput()->getSize());
- result_column_nullable->get_null_map_data().push_back(0);
- }
- subcolumns.get_mutable_root()->data.get_finalized_column().clear();
- // assign merged column, do insert_range_from to make a copy, instead of
replace the ptr itselft
- // to make sure the root column ptr is not changed
-
subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from(
- *mresult->get_ptr(), 0, num_rows);
- return Status::OK();
+ return res;
}
void ColumnObject::unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns)
const {
@@ -1634,13 +1549,50 @@ void ColumnObject::unnest(Subcolumns::NodePtr& entry,
Subcolumns& subcolumns) co
}
}
-void ColumnObject::finalize(FinalizeMode mode) {
+Status ColumnObject::finalize(FinalizeMode mode) {
Subcolumns new_subcolumns;
// finalize root first
if (mode == FinalizeMode::WRITE_MODE || !is_null_root()) {
new_subcolumns.create_root(subcolumns.get_root()->data);
new_subcolumns.get_mutable_root()->data.finalize(mode);
}
+
+ // pick sparse columns
+ std::set<String> selected_subcolumns;
+ std::set<String> remaining_subcolumns;
+ if (subcolumns.size() > MAX_SUBCOLUMNS) {
+ // pick subcolumns sort by size of none null values
+ std::unordered_map<String, size_t> none_null_value_sizes;
+ // 1. get the none null value sizes
+ for (auto&& entry : subcolumns) {
+ if (entry->data.is_root) {
+ continue;
+ }
+ size_t size = entry->data.get_non_null_value_size();
+ none_null_value_sizes[entry->path.get_path()] = size;
+ }
+ // 2. sort by the size
+ std::vector<std::pair<String, size_t>>
sorted_by_size(none_null_value_sizes.begin(),
+
none_null_value_sizes.end());
+ std::sort(sorted_by_size.begin(), sorted_by_size.end(),
+ [](const auto& a, const auto& b) { return a.second >
b.second; });
+
+ // 3. pick MAX_SUBCOLUMNS selected subcolumns
+ std::set<String> selected_subcolumns;
+ for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS,
sorted_by_size.size()); ++i) {
+ selected_subcolumns.insert(sorted_by_size[i].first);
+ }
+
+ // 4. put remaining subcolumns to remaining_subcolumns
+ std::vector<String> remaining_subcolumns;
+ for (const auto& entry : sorted_by_size) {
+ if (selected_subcolumns.find(entry.first) ==
selected_subcolumns.end()) {
+ remaining_subcolumns.push_back(entry.first);
+ }
+ }
+ }
+
+ // finalize all subcolumns
for (auto&& entry : subcolumns) {
const auto& least_common_type = entry->data.get_least_common_type();
/// Do not add subcolumns, which consists only from NULLs
@@ -1661,24 +1613,34 @@ void ColumnObject::finalize(FinalizeMode mode) {
if (entry->data.is_root) {
continue;
}
+ }
- // Check and spilit sparse subcolumns, not support nested array at
present
- if (mode == FinalizeMode::WRITE_MODE &&
(entry->data.check_if_sparse_column(num_rows)) &&
- !entry->path.has_nested_part()) {
- // TODO seperate ambiguous path
- sparse_columns.add(entry->path, entry->data);
- continue;
+ // add selected subcolumns to new_subcolumns
+ for (auto&& entry : subcolumns) {
+ if (selected_subcolumns.find(entry->path.get_path()) !=
selected_subcolumns.end()) {
+ new_subcolumns.add(entry->path, entry->data);
}
+ }
- new_subcolumns.add(entry->path, entry->data);
+ std::map<String, Subcolumn> remaing_subcolumns;
+ // merge remaining subcolumns to sparse_column
+ for (auto&& entry : subcolumns) {
+ if (remaining_subcolumns.find(entry->path.get_path()) !=
selected_subcolumns.end()) {
+ remaing_subcolumns.emplace(entry->path.get_path(), entry->data);
+ }
}
+
+ // merge and encode sparse column
+ RETURN_IF_ERROR(merge_sparse_columns(remaing_subcolumns));
+
std::swap(subcolumns, new_subcolumns);
doc_structure = nullptr;
_prev_positions.clear();
+ return Status::OK();
}
void ColumnObject::finalize() {
- finalize(FinalizeMode::READ_MODE);
+ static_cast<void>(finalize(FinalizeMode::READ_MODE));
}
void ColumnObject::ensure_root_node_type(const DataTypePtr&
expected_root_type) {
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index 21bb4469115..1475a168c23 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -38,6 +38,7 @@
#include "olap/tablet_schema.h"
#include "util/jsonb_document.h"
#include "vec/columns/column.h"
+#include "vec/columns/column_map.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/common/cow.h"
#include "vec/common/string_ref.h"
@@ -97,6 +98,7 @@ public:
constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB;
// Nullable(Array(Nullable(Object)))
const static DataTypePtr NESTED_TYPE;
+ const static size_t MAX_SUBCOLUMNS = 200;
// Finlize mode for subcolumns, write mode will estimate which subcolumns
are sparse columns(too many null values inside column),
// merge and encode them into a shared column in root column. Only affects
in flush block to segments.
// Otherwise read mode should be as default mode.
@@ -124,6 +126,8 @@ public:
return least_common_type.get_base();
}
+ size_t get_non_null_value_size() const;
+
const DataTypeSerDeSPtr& get_least_common_type_serde() const {
return least_common_type.get_serde();
}
@@ -240,12 +244,8 @@ private:
const bool is_nullable;
Subcolumns subcolumns;
size_t num_rows;
- // sparse columns will be merge and encoded into root column
- Subcolumns sparse_columns;
- // The rapidjson document format of Subcolumns tree structure
- // the leaves is null.In order to display whole document, copy
- // this structure and fill with Subcolumns sub items
- mutable std::shared_ptr<rapidjson::Document> doc_structure;
+ // sparse columns will be merge and encoded as ColumnMap<String, String>
+ WrappedPtr sparse_column;
using SubColumnWithName = std::pair<PathInData, const Subcolumn*>;
// Cached search results for previous row (keyed as index in JSON object)
- used as a hint.
@@ -280,12 +280,19 @@ public:
Status serialize_one_row_to_json_format(int64_t row,
rapidjson::StringBuffer* output,
bool* is_null) const;
- // merge multiple sub sparse columns into root
- Status merge_sparse_to_root_column();
+ // merge multiple sub sparse columns
+ Status merge_sparse_columns(const std::map<String, Subcolumn>&
remaing_subcolumns);
// ensure root node is a certain type
void ensure_root_node_type(const DataTypePtr& type);
+ std::pair<ColumnString*, ColumnString*> get_sparse_data_paths_and_values()
{
+ auto& column_map = assert_cast<ColumnMap&>(*sparse_column);
+ auto& key = assert_cast<ColumnString&>(column_map.get_keys());
+ auto& value = assert_cast<ColumnString&>(column_map.get_values());
+ return {&key, &value};
+ }
+
// create jsonb root if missing
// notice: should only using in VariantRootColumnIterator
// since some datastructures(sparse columns are schema on read
@@ -345,14 +352,14 @@ public:
const Subcolumns& get_subcolumns() const { return subcolumns; }
- const Subcolumns& get_sparse_subcolumns() const { return sparse_columns; }
-
Subcolumns& get_subcolumns() { return subcolumns; }
+ ColumnPtr get_sparse_column() { return
sparse_column->convert_to_full_column_if_const(); }
+
PathsInData getKeys() const;
// use sparse_subcolumns_schema to record sparse column's path info and
type
- void finalize(FinalizeMode mode);
+ Status finalize(FinalizeMode mode);
/// Finalizes all subcolumns.
void finalize() override;
@@ -361,7 +368,7 @@ public:
MutableColumnPtr clone_finalized() const {
auto finalized = IColumn::mutate(get_ptr());
-
static_cast<ColumnObject*>(finalized.get())->finalize(FinalizeMode::READ_MODE);
+ static_cast<ColumnObject*>(finalized.get())->finalize();
return finalized;
}
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index fd50af3e1fc..42f9240646f 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -542,14 +542,6 @@ Status parse_variant_columns(Block& block, const
std::vector<int>& variant_pos,
});
}
-Status encode_variant_sparse_subcolumns(ColumnObject& column) {
- // Make sure the root node is jsonb storage type
- auto expected_root_type =
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
- column.ensure_root_node_type(expected_root_type);
- RETURN_IF_ERROR(column.merge_sparse_to_root_column());
- return Status::OK();
-}
-
// sort by paths in lexicographical order
vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
const vectorized::ColumnObject::Subcolumns& subcolumns) {
@@ -614,4 +606,19 @@ bool has_schema_index_diff(const TabletSchema* new_schema,
const TabletSchema* o
return new_schema_has_inverted_index != old_schema_has_inverted_index;
}
+TabletColumn create_sparse_column(int32_t parent_unique_id) {
+ TColumn tcolumn;
+ tcolumn.column_name = ".sparse";
+ tcolumn.col_unique_id = parent_unique_id;
+ tcolumn.column_type = TColumnType {};
+ tcolumn.column_type.type = TPrimitiveType::MAP;
+
+ TColumn child_tcolumn;
+ tcolumn.column_type = TColumnType {};
+ tcolumn.column_type.type = TPrimitiveType::STRING;
+ tcolumn.children_column.push_back(child_tcolumn);
+ tcolumn.children_column.push_back(child_tcolumn);
+ return TabletColumn {tcolumn};
+}
+
} // namespace doris::vectorized::schema_util
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 7c228ed2cc0..0507c9e2fe6 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -126,4 +126,7 @@ std::string dump_column(DataTypePtr type, const ColumnPtr&
col);
bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema*
old_schema,
int32_t new_col_idx, int32_t old_col_idx);
+// create ColumnMap<String, String>
+TabletColumn create_sparse_column(int32_t parent_unique_id);
+
} // namespace doris::vectorized::schema_util
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index 4c7183bae9a..37a4f0a70ee 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -159,6 +159,12 @@ message ColumnPathInfo {
optional uint32 parrent_column_unique_id = 4;
}
+message VariantStatisticsPB {
+ // in the order of subcolumns in variant
+ repeated uint32 subcolumn_non_null_size = 1;
+ map<string, uint32> sparse_column_non_null_size = 2;
+}
+
message ColumnMetaPB {
// column id in table schema
optional uint32 column_id = 1;
@@ -192,11 +198,12 @@ message ColumnMetaPB {
optional int32 precision = 15; // ColumnMessage.precision
optional int32 frac = 16; // ColumnMessag
- repeated ColumnMetaPB sparse_columns = 17; // sparse column within a
variant column
+ repeated ColumnMetaPB sparse_columns = 17; // deprecated
optional bool result_is_nullable = 18; // used on agg_state type
optional string function_name = 19; // used on agg_state type
optional int32 be_exec_version = 20; // used on agg_state type
+ optional VariantStatisticsPB variant_statistics = 21; // only used in
variant type
}
message PrimaryKeyIndexMetaPB {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]