This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f67ed9f0a4a [fix](iceberg) iceberg use customer method to encode 
special characters of field name (#27108) (#27205)
f67ed9f0a4a is described below

commit f67ed9f0a4aa55a4c9a131648132dd52f9ee98af
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Nov 17 21:21:00 2023 +0800

    [fix](iceberg) iceberg use customer method to encode special characters of 
field name (#27108) (#27205)
    
    Fix two bugs:
    1. Missing column is case sensitive, change the column name to lower case 
in FE for hive/iceberg/hudi
    2. Iceberg use custom method to encode special characters in column name. 
Decode the column name to match the right column in parquet reader.
---
 be/src/vec/exec/format/parquet/schema_desc.cpp     | 67 ++++++++++++++++++++++
 be/src/vec/exec/format/parquet/schema_desc.h       |  4 ++
 .../exec/format/parquet/vparquet_file_metadata.h   |  3 +
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  6 ++
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  3 +
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  1 +
 be/src/vec/exec/scan/vfile_scanner.cpp             | 22 +++----
 be/src/vec/exec/scan/vfile_scanner.h               |  6 +-
 .../doris/catalog/external/HMSExternalTable.java   |  6 +-
 .../catalog/external/IcebergExternalTable.java     |  3 +-
 .../test_external_catalog_iceberg_common.out       | 10 +++-
 .../iceberg/test_external_catalog_iceberg_common   |  3 +
 12 files changed, 112 insertions(+), 22 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp 
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index 1b830689c0e..c9283c62889 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -26,6 +26,7 @@
 #include "common/logging.h"
 #include "runtime/define_primitive_type.h"
 #include "util/slice.h"
+#include "util/string_util.h"
 
 namespace doris::vectorized {
 
@@ -239,6 +240,72 @@ TypeDescriptor FieldDescriptor::get_doris_type(const 
tparquet::SchemaElement& ph
     return type;
 }
 
+// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
+static bool is_valid_avro_name(const std::string& name) {
+    int length = name.length();
+    char first = name[0];
+    if (!isalpha(first) && first != '_') {
+        return false;
+    }
+
+    for (int i = 1; i < length; i++) {
+        char character = name[i];
+        if (!isalpha(character) && !isdigit(character) && character != '_') {
+            return false;
+        }
+    }
+    return true;
+}
+
+// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
+static void sanitize_avro_name(std::ostringstream& buf, char character) {
+    if (isdigit(character)) {
+        buf << '_' << character;
+    } else {
+        std::stringstream ss;
+        ss << std::hex << (int)character;
+        std::string hex_str = ss.str();
+        buf << "_x" << doris::to_lower(hex_str);
+    }
+}
+
+// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
+static std::string sanitize_avro_name(const std::string& name) {
+    std::ostringstream buf;
+    int length = name.length();
+    char first = name[0];
+    if (!isalpha(first) && first != '_') {
+        sanitize_avro_name(buf, first);
+    } else {
+        buf << first;
+    }
+
+    for (int i = 1; i < length; i++) {
+        char character = name[i];
+        if (!isalpha(character) && !isdigit(character) && character != '_') {
+            sanitize_avro_name(buf, character);
+        } else {
+            buf << character;
+        }
+    }
+    return buf.str();
+}
+
+void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>& 
read_columns) {
+    for (const std::string& col : read_columns) {
+        if (!is_valid_avro_name(col)) {
+            std::string sanitize_name = sanitize_avro_name(col);
+            auto it = _name_to_field.find(sanitize_name);
+            if (it != _name_to_field.end()) {
+                FieldSchema* schema = const_cast<FieldSchema*>(it->second);
+                schema->name = col;
+                _name_to_field.emplace(col, schema);
+                _name_to_field.erase(sanitize_name);
+            }
+        }
+    }
+}
+
 TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType 
logicalType) {
     TypeDescriptor type;
     if (logicalType.__isset.STRING) {
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h 
b/be/src/vec/exec/format/parquet/schema_desc.h
index fb61ad918a7..d763e40e2ed 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -91,6 +91,10 @@ private:
     TypeDescriptor get_doris_type(const tparquet::SchemaElement& 
physical_schema);
 
 public:
+    // org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special 
characters,
+    // we have to decode these characters
+    void iceberg_sanitize(const std::vector<std::string>& read_columns);
+
     FieldDescriptor() = default;
     ~FieldDescriptor() = default;
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h 
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
index 6f52ef5b4af..5d745a0db62 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
@@ -32,6 +32,9 @@ public:
     Status init_schema();
     const FieldDescriptor& schema() const { return _schema; }
     const tparquet::FileMetaData& to_thrift();
+    void iceberg_sanitize(const std::vector<std::string>& read_columns) {
+        _schema.iceberg_sanitize(read_columns);
+    }
     std::string debug_string() const;
 
 private:
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 49798ed4f13..124f623f2e9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -294,6 +294,12 @@ void ParquetReader::_init_file_description() {
     }
 }
 
+void ParquetReader::iceberg_sanitize(const std::vector<std::string>& 
read_columns) {
+    if (_file_metadata != nullptr) {
+        _file_metadata->iceberg_sanitize(read_columns);
+    }
+}
+
 Status ParquetReader::init_reader(
         const std::vector<std::string>& all_column_names,
         const std::vector<std::string>& missing_column_names,
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 5ceca55d7ec..6efd0bd7237 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -139,6 +139,9 @@ public:
 
     const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }
 
+    // Only for iceberg reader to sanitize invalid column names
+    void iceberg_sanitize(const std::vector<std::string>& read_columns);
+
     Status set_fill_columns(
             const std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>&
                     partition_columns,
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 9f98a0ae3f4..bcaa99143f0 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -129,6 +129,7 @@ Status IcebergTableReader::init_reader(
     _gen_file_col_names();
     _gen_new_colname_to_value_range();
     parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
+    parquet_reader->iceberg_sanitize(_all_required_col_names);
     Status status = parquet_reader->init_reader(
             _all_required_col_names, _not_in_file_col_names, 
&_new_colname_to_value_range,
             conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 7f35422ec03..fd77b084bb9 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -418,7 +418,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
 }
 
 Status VFileScanner::_fill_columns_from_path(size_t rows) {
-    for (auto& kv : *_partition_columns) {
+    for (auto& kv : _partition_col_descs) {
         auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
         IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
         auto& [value, slot_desc] = kv.second;
@@ -437,7 +437,7 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
     }
 
     SCOPED_TIMER(_fill_missing_columns_timer);
-    for (auto& kv : *_missing_columns) {
+    for (auto& kv : _missing_col_descs) {
         if (kv.second == nullptr) {
             // no default column, fill with null
             auto nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(
@@ -862,9 +862,8 @@ Status VFileScanner::_get_next_reader() {
 }
 
 Status VFileScanner::_generate_fill_columns() {
-    _partition_columns.reset(
-            new std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>());
-    _missing_columns.reset(new std::unordered_map<std::string, 
VExprContextSPtr>());
+    _partition_col_descs.clear();
+    _missing_col_descs.clear();
 
     const TFileRangeDesc& range = _ranges.at(_next_range - 1);
     if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
@@ -881,8 +880,8 @@ Status VFileScanner::_generate_fill_columns() {
                 if (size == 4 && memcmp(data, "null", 4) == 0) {
                     data = TextConverter::NULL_STR;
                 }
-                _partition_columns->emplace(slot_desc->col_name(),
-                                            std::make_tuple(data, slot_desc));
+                _partition_col_descs.emplace(slot_desc->col_name(),
+                                             std::make_tuple(data, slot_desc));
             }
         }
     }
@@ -901,16 +900,11 @@ Status VFileScanner::_generate_fill_columns() {
                 return Status::InternalError("failed to find default value 
expr for slot: {}",
                                              slot_desc->col_name());
             }
-            _missing_columns->emplace(slot_desc->col_name(), it->second);
+            _missing_col_descs.emplace(slot_desc->col_name(), it->second);
         }
     }
 
-    RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, 
*_missing_columns));
-    if (_cur_reader->fill_all_columns()) {
-        _partition_columns.reset(nullptr);
-        _missing_columns.reset(nullptr);
-    }
-    return Status::OK();
+    return _cur_reader->set_fill_columns(_partition_col_descs, 
_missing_col_descs);
 }
 
 Status VFileScanner::_init_expr_ctxes() {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 3611785625d..e3533ce05c2 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -162,9 +162,9 @@ protected:
     std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
     std::unique_ptr<io::IOContext> _io_ctx;
 
-    std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>>
-            _partition_columns;
-    std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>> 
_missing_columns;
+    std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
+            _partition_col_descs;
+    std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
 
 private:
     RuntimeProfile::Counter* _get_block_timer = nullptr;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 2729fdf7a95..0243ad12f75 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -435,7 +435,7 @@ public class HMSExternalTable extends ExternalTable {
         } else {
             List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(schema.size());
             for (FieldSchema field : schema) {
-                tmpSchema.add(new Column(field.getName(),
+                tmpSchema.add(new 
Column(field.getName().toLowerCase(Locale.ROOT),
                         
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
                         true, field.getComment(), true, -1));
             }
@@ -484,7 +484,7 @@ public class HMSExternalTable extends ExternalTable {
         Schema schema = icebergTable.schema();
         List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(hmsSchema.size());
         for (FieldSchema field : hmsSchema) {
-            tmpSchema.add(new Column(field.getName(),
+            tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
                     
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
                             IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
                     true, null, true, false, null, field.getComment(), true, 
null,
@@ -500,7 +500,7 @@ public class HMSExternalTable extends ExternalTable {
         for (String partitionKey : partitionKeys) {
             // Do not use "getColumn()", which will cause dead loop
             for (Column column : schema) {
-                if (partitionKey.equals(column.getName())) {
+                if (partitionKey.equalsIgnoreCase(column.getName())) {
                     // For partition column, if it is string type, change it 
to varchar(65535)
                     // to be same as doris managed table.
                     // This is to avoid some unexpected behavior such as 
different partition pruning result
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
index bede9b99e43..7398ff19c9e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.types.Types;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Optional;
 
 public class IcebergExternalTable extends ExternalTable {
@@ -66,7 +67,7 @@ public class IcebergExternalTable extends ExternalTable {
             List<Types.NestedField> columns = schema.columns();
             List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
             for (Types.NestedField field : columns) {
-                tmpSchema.add(new Column(field.name(),
+                tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
                         icebergTypeToDorisType(field.type()), true, null, 
true, field.doc(), true,
                         
schema.caseInsensitiveFindField(field.name()).fieldId()));
             }
diff --git 
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out
 
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out
index 9554f1d21f0..a51bac0e1b5 100644
--- 
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out
+++ 
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out
@@ -1,3 +1,11 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !q01 --
-599715
\ No newline at end of file
+599715
+
+-- !sanitize_mara --
+MATNR1 3.140   /DSD/SV_CNT_GRP1
+MATNR2 3.240   /DSD/SV_CNT_GRP2
+MATNR4 3.440   /DSD/SV_CNT_GRP4
+MATNR5 3.540   /DSD/SV_CNT_GRP5
+MATNR6 3.640   /DSD/SV_CNT_GRP6
+
diff --git 
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common
 
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common
index a035ea6d1b3..577a4e6702a 100644
--- 
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common
+++ 
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common
@@ -46,5 +46,8 @@ suite("test_external_catalog_iceberg_common", 
"p2,external,iceberg,external_remo
         }
         sql """ use `iceberg_catalog`; """
         q01_parquet()
+
+        // test the special characters in table fields
+        qt_sanitize_mara """select MaTnR, NtgEW, `/dsd/Sv_cnt_grP` from 
sanitize_mara order by mAtNr"""
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to