eldenmoon commented on code in PR #63192:
URL: https://github.com/apache/doris/pull/63192#discussion_r3231390450
##########
be/src/format/parquet/vparquet_column_reader.cpp:
##########
@@ -1001,6 +1157,393 @@ Status StructColumnReader::read_column_data(
return Status::OK();
}
+Status VariantColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+ const tparquet::RowGroup& row_group, size_t
max_buf_size,
+ std::unordered_map<int,
tparquet::OffsetIndex>& col_offsets,
+ RuntimeState* state, bool in_collection,
+ const std::set<uint64_t>& column_ids,
+ const std::set<uint64_t>& filter_column_ids) {
+ _field_schema = field;
+ _variant_struct_field = std::make_unique<FieldSchema>(*field);
+
+ DataTypes child_types;
+ Strings child_names;
+ child_types.reserve(field->children.size());
+ child_names.reserve(field->children.size());
+ for (const auto& child : field->children) {
+ child_types.push_back(make_nullable(child.data_type));
+ child_names.push_back(child.name);
+ }
+ _variant_struct_type = std::make_shared<DataTypeStruct>(child_types,
child_names);
+ if (field->data_type->is_nullable()) {
+ _variant_struct_type = make_nullable(_variant_struct_type);
+ }
+ _variant_struct_field->data_type = _variant_struct_type;
+
+ RETURN_IF_ERROR(ParquetColumnReader::create(file,
_variant_struct_field.get(), row_group,
+ _row_ranges, _ctz, _io_ctx,
_struct_reader,
+ max_buf_size, col_offsets,
state, in_collection,
+ column_ids,
filter_column_ids));
+ _struct_reader->set_column_in_nested();
+ return Status::OK();
+}
+
+Status VariantColumnReader::_get_binary_field(const Field& field, std::string*
value,
+ bool* present) const {
+ if (field.is_null()) {
+ *present = false;
+ return Status::OK();
+ }
+ *present = true;
+ switch (field.get_type()) {
+ case TYPE_STRING:
+ *value = field.get<TYPE_STRING>();
+ return Status::OK();
+ case TYPE_CHAR:
+ *value = field.get<TYPE_CHAR>();
+ return Status::OK();
+ case TYPE_VARCHAR:
+ *value = field.get<TYPE_VARCHAR>();
+ return Status::OK();
+ case TYPE_VARBINARY: {
+ auto ref = field.get<TYPE_VARBINARY>().to_string_ref();
+ value->assign(ref.data, ref.size);
+ return Status::OK();
+ }
+ default:
+ return Status::Corruption("Parquet VARIANT binary field has unexpected
Doris type {}",
+ field.get_type_name());
+ }
+}
+
+Status VariantColumnReader::_field_to_json(const FieldSchema& field_schema,
const Field& field,
+ std::string* json, bool* present)
const {
+ if (field.is_null()) {
+ *present = false;
+ return Status::OK();
+ }
+ *present = true;
+ const DataTypePtr& type = remove_nullable(field_schema.data_type);
+ switch (type->get_primitive_type()) {
+ case TYPE_BOOLEAN:
+ case TYPE_TINYINT:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_BIGINT:
+ case TYPE_LARGEINT:
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I:
+ case TYPE_DECIMAL256:
+ json->append(field.to_debug_string(type->get_scale()));
+ return Status::OK();
+ case TYPE_FLOAT: {
+ const auto value = field.get<TYPE_FLOAT>();
+ json->append(std::isfinite(value) ?
field.to_debug_string(type->get_scale()) : "null");
+ return Status::OK();
+ }
+ case TYPE_DOUBLE: {
+ const auto value = field.get<TYPE_DOUBLE>();
+ json->append(std::isfinite(value) ?
field.to_debug_string(type->get_scale()) : "null");
+ return Status::OK();
+ }
+ case TYPE_TIMEV2:
+
json->append(std::to_string(static_cast<int64_t>(std::llround(field.get<TYPE_TIMEV2>()))));
+ return Status::OK();
+ case TYPE_DATE:
+
json->append(std::to_string(variant_date_value(field.get<TYPE_DATE>())));
+ return Status::OK();
+ case TYPE_DATETIME:
+
json->append(std::to_string(variant_datetime_value(field.get<TYPE_DATETIME>())));
+ return Status::OK();
+ case TYPE_DATEV2:
+
json->append(std::to_string(variant_date_value(field.get<TYPE_DATEV2>())));
+ return Status::OK();
+ case TYPE_DATETIMEV2:
+
json->append(std::to_string(variant_datetime_value(field.get<TYPE_DATETIMEV2>())));
+ return Status::OK();
+ case TYPE_TIMESTAMPTZ:
+
json->append(std::to_string(variant_datetime_value(field.get<TYPE_TIMESTAMPTZ>())));
+ return Status::OK();
+ case TYPE_STRING:
+ case TYPE_CHAR:
+ case TYPE_VARCHAR: {
+ std::string value = field.to_debug_string(type->get_scale());
+ append_json_string(value, json);
+ return Status::OK();
+ }
+ case TYPE_VARBINARY:
+ return Status::NotSupported("Parquet VARIANT binary typed_value is not
supported");
+ case TYPE_ARRAY:
+ return _array_field_to_json(field_schema, field,
+ assert_cast<const
DataTypeArray*>(type.get()), json);
+ case TYPE_STRUCT:
+ return _struct_field_to_json(field_schema, field, json);
+ default:
+ return Status::Corruption("Unsupported Parquet VARIANT typed_value
Doris type {}",
+ type->get_name());
+ }
+}
+
+Status VariantColumnReader::_array_field_to_json(const FieldSchema&
field_schema,
+ const Field& field,
+ const DataTypeArray*
array_type,
+ std::string* json) const {
+ const auto& values = field.get<TYPE_ARRAY>();
+ const FieldSchema* element_schema =
+ field_schema.children.empty() ? nullptr :
field_schema.children.data();
+ FieldSchema synthetic_element;
+ if (element_schema == nullptr) {
+ synthetic_element.data_type = array_type->get_nested_type();
+ element_schema = &synthetic_element;
+ }
+ json->push_back('[');
+ for (int i = 0; i < values.size(); ++i) {
+ if (i != 0) {
+ json->push_back(',');
+ }
+ std::string element_json;
+ bool element_present = false;
+ RETURN_IF_ERROR(
+ _field_to_json(*element_schema, values[i], &element_json,
&element_present));
+ json->append(element_present ? element_json : "null");
+ }
+ json->push_back(']');
+ return Status::OK();
+}
+
+Status VariantColumnReader::_struct_field_to_json(const FieldSchema&
field_schema,
+ const Field& field,
std::string* json) const {
+ const auto& values = field.get<TYPE_STRUCT>();
+ json->push_back('{');
+ for (int i = 0; i < values.size(); ++i) {
+ if (i != 0) {
+ json->push_back(',');
+ }
+ const FieldSchema& child_schema = field_schema.children[i];
+ append_json_string(child_schema.name, json);
+ json->push_back(':');
+ std::string child_json;
+ bool child_present = false;
+ RETURN_IF_ERROR(_field_to_json(child_schema, values[i], &child_json,
&child_present));
+ json->append(child_present ? child_json : "null");
+ }
+ json->push_back('}');
+ return Status::OK();
+}
+
+Status VariantColumnReader::_typed_value_to_json(const FieldSchema&
typed_value_field,
+ const Field& field, const
std::string& metadata,
+ std::string* json, bool*
present) const {
+ if (field.is_null()) {
+ *present = false;
+ return Status::OK();
+ }
+ const DataTypePtr& typed_type =
remove_nullable(typed_value_field.data_type);
+ if (typed_type->get_primitive_type() == TYPE_STRUCT) {
+ *present = true;
+ return _typed_struct_to_json(typed_value_field, field, metadata, json);
+ }
+ if (typed_type->get_primitive_type() == TYPE_ARRAY) {
+ *present = true;
+ return _typed_array_to_json(typed_value_field, field, metadata, json);
+ }
+ return _field_to_json(typed_value_field, field, json, present);
+}
+
+Status VariantColumnReader::_shredded_field_to_json(const FieldSchema&
field_schema,
+ const Field& field, const
std::string& metadata,
+ std::string* json, bool*
present) const {
+ if (is_variant_wrapper_field(field_schema)) {
+ return _variant_to_json(field_schema, field, &metadata, json, present);
+ }
+ return _typed_value_to_json(field_schema, field, metadata, json, present);
+}
+
+Status VariantColumnReader::_typed_struct_to_json(const FieldSchema&
typed_value_field,
+ const Field& field, const
std::string& metadata,
+ std::string* json) const {
+ const auto& values = field.get<TYPE_STRUCT>();
+ json->push_back('{');
+ bool first = true;
+ for (int i = 0; i < typed_value_field.children.size(); ++i) {
+ std::string child_json;
+ bool child_present = false;
+ RETURN_IF_ERROR(_shredded_field_to_json(typed_value_field.children[i],
values[i], metadata,
+ &child_json, &child_present));
+ if (!child_present) {
+ continue;
+ }
+ if (!first) {
+ json->push_back(',');
+ }
+ first = false;
+ append_json_string(typed_value_field.children[i].name, json);
+ json->push_back(':');
+ json->append(child_json);
+ }
+ json->push_back('}');
+ return Status::OK();
+}
+
+Status VariantColumnReader::_typed_array_to_json(const FieldSchema&
typed_value_field,
+ const Field& field, const
std::string& metadata,
+ std::string* json) const {
+ const auto& values = field.get<TYPE_ARRAY>();
+ const FieldSchema* element_schema =
+ typed_value_field.children.empty() ? nullptr :
typed_value_field.children.data();
+ json->push_back('[');
+ for (int i = 0; i < values.size(); ++i) {
+ if (i != 0) {
+ json->push_back(',');
+ }
+ std::string element_json;
+ bool element_present = false;
+ if (element_schema != nullptr) {
+ RETURN_IF_ERROR(_shredded_field_to_json(*element_schema,
values[i], metadata,
+ &element_json,
&element_present));
+ } else {
+ RETURN_IF_ERROR(
+ _field_to_json(typed_value_field, values[i],
&element_json, &element_present));
+ }
+ json->append(element_present ? element_json : "null");
+ }
+ json->push_back(']');
+ return Status::OK();
+}
+
+Status VariantColumnReader::_variant_to_json(const FieldSchema& variant_field,
const Field& field,
+ const std::string*
inherited_metadata,
+ std::string* json, bool* present)
const {
+ if (field.is_null()) {
+ *present = false;
+ return Status::OK();
+ }
+ const auto& values = field.get<TYPE_STRUCT>();
+ int metadata_idx = find_child_idx(variant_field, "metadata");
+ int value_idx = find_child_idx(variant_field, "value");
+ int typed_value_idx = find_child_idx(variant_field, "typed_value");
+
+ std::string metadata;
+ bool has_metadata = false;
+ if (inherited_metadata != nullptr) {
+ metadata = *inherited_metadata;
+ has_metadata = true;
+ }
+ if (metadata_idx >= 0) {
+ bool metadata_present = false;
+ RETURN_IF_ERROR(_get_binary_field(values[metadata_idx], &metadata,
&metadata_present));
+ has_metadata = metadata_present;
+ }
+
+ std::string value_json;
+ bool value_present = false;
+ if (value_idx >= 0) {
+ std::string value;
+ RETURN_IF_ERROR(_get_binary_field(values[value_idx], &value,
&value_present));
+ if (value_present) {
+ if (!has_metadata) {
+ return Status::Corruption("Parquet VARIANT value is present
without metadata");
+ }
+ RETURN_IF_ERROR(parquet::decode_variant_to_json(
+ StringRef(metadata.data(), metadata.size()),
+ StringRef(value.data(), value.size()), &value_json));
+ }
+ }
+
+ std::string typed_json;
+ bool typed_present = false;
+ if (typed_value_idx >= 0) {
+
RETURN_IF_ERROR(_typed_value_to_json(variant_field.children[typed_value_idx],
+ values[typed_value_idx],
metadata, &typed_json,
+ &typed_present));
+ }
+
+ if (typed_present) {
+ if (value_present) {
+ RETURN_IF_ERROR(merge_json_objects(value_json, typed_json, json));
+ } else {
+ *json = std::move(typed_json);
+ }
+ *present = true;
+ return Status::OK();
+ }
+ if (value_present) {
+ *json = std::move(value_json);
+ *present = true;
+ return Status::OK();
+ }
+ *present = false;
+ return Status::OK();
+}
+
+Status VariantColumnReader::read_column_data(
+ ColumnPtr& doris_column, const DataTypePtr& type,
+ const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
FilterMap& filter_map,
+ size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
+ int64_t real_column_size) {
+ (void)root_node;
+ if (remove_nullable(type)->get_primitive_type() !=
PrimitiveType::TYPE_VARIANT) {
+ return Status::Corruption(
+ "Wrong data type for column '{}', expected Variant type,
actual type: {}.",
+ _field_schema->name, type->get_name());
+ }
+
+ ColumnPtr struct_column = _variant_struct_type->create_column();
+ const size_t old_struct_rows = struct_column->size();
+ auto const_node = TableSchemaChangeHelper::ConstNode::get_instance();
+ RETURN_IF_ERROR(_struct_reader->read_column_data(struct_column,
_variant_struct_type,
+ const_node, filter_map,
batch_size, read_rows,
+ eof, is_dict_filter,
real_column_size));
+
+ const size_t new_struct_rows = struct_column->size() - old_struct_rows;
+ if (new_struct_rows == 0) {
+ return Status::OK();
+ }
+
+ MutableColumnPtr variant_column_ptr;
+ NullMap* null_map_ptr = nullptr;
+ auto mutable_column = doris_column->assume_mutable();
+ if (doris_column->is_nullable()) {
+ auto* nullable_column =
assert_cast<ColumnNullable*>(mutable_column.get());
+ variant_column_ptr = nullable_column->get_nested_column_ptr();
+ null_map_ptr = &nullable_column->get_null_map_data();
+ } else {
+ if (_field_schema->data_type->is_nullable()) {
+ return Status::Corruption("Not nullable column has null values in
parquet file");
+ }
+ variant_column_ptr = std::move(mutable_column);
+ }
+ auto* variant_column =
assert_cast<ColumnVariant*>(variant_column_ptr.get());
+
+ ParseConfig parse_config;
+ for (size_t i = old_struct_rows; i < struct_column->size(); ++i) {
+ std::string json;
+ bool present = false;
+ RETURN_IF_ERROR(
+ _variant_to_json(*_field_schema, (*struct_column)[i], nullptr,
&json, &present));
Review Comment:
这里逐个field的处理方式太慢了, 能不能向量化(vectorized)的方式处理
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]