This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new bc6af178b3e branch-2.1: [fix](parquet)Fix data column and null map
column not equal when reading Parquet complex type cross-page data #47734
(#48039)
bc6af178b3e is described below
commit bc6af178b3e9cae02874e1a64dc47179ecc172bf
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 19 15:21:13 2025 +0800
branch-2.1: [fix](parquet)Fix data column and null map column not equal
when reading Parquet complex type cross-page data #47734 (#48039)
Cherry-picked from #47734
Co-authored-by: daidai <[email protected]>
---
.../exec/format/parquet/vparquet_column_reader.cpp | 23 ++++++++-
.../exec/format/parquet/vparquet_column_reader.h | 21 ++++++---
.../tvf/{test_tvf_p2.out => test_tvf_p0.out} | Bin 5562 -> 5674 bytes
.../hive/test_parquet_complex_cross_page.out | Bin 0 -> 137 bytes
.../tvf/{test_tvf_p2.groovy => test_tvf_p0.groovy} | 24 +++++++++-
.../hive/test_parquet_complex_cross_page.groovy | 52 +++++++++++++++++++++
6 files changed, 110 insertions(+), 10 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index fd3200b3640..4e8bcb2c930 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -323,6 +323,18 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr&
doris_column, DataType
// just read the remaining values of the last row in previous page,
// so there's no a new row should be read.
batch_size = 0;
+ /*
+ * Since the function is repeatedly called to fetch data for the batch
size,
+ * it causes `_rep_levels.resize(0); _def_levels.resize(0);`,
resulting in the
+ * definition and repetition levels of the reader only containing the
latter
+ * part of the batch (i.e., missing some parts). Therefore, when using
the
+ * definition and repetition levels to fill the null_map for structs
and maps,
+ * the function should not be called multiple times before filling.
+ * todo:
+ * We may need to consider reading the entire batch of data at once,
as this approach
+ * would be more user-friendly in terms of function usage. However, we
must consider that if the
+ * data spans multiple pages, memory usage may increase significantly.
+ */
} else {
_rep_levels.resize(0);
_def_levels.resize(0);
@@ -746,7 +758,7 @@ Status StructColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
continue;
}
- _read_column_names.insert(doris_name);
+ _read_column_names.emplace_back(doris_name);
select_vector.reset();
size_t field_rows = 0;
@@ -758,6 +770,15 @@ Status StructColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
is_dict_filter));
*read_rows = field_rows;
*eof = field_eof;
+ /*
+ * Considering the issue in the `_read_nested_column` function
where data may span across pages, leading
+ * to missing definition and repetition levels, when filling the
null_map of the struct later, it is
+ * crucial to use the definition and repetition levels from the
first read column
+ * (since `_read_nested_column` is not called repeatedly).
+ *
+ * It is worth mentioning that, theoretically, any sub-column can
be chosen to fill the null_map,
+ * and selecting the shortest one will offer better performance
+ */
} else {
while (field_rows < *read_rows && !field_eof) {
size_t loop_rows = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 4c6e5b1eac9..bac73e36661 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -280,24 +280,30 @@ public:
if (!_read_column_names.empty()) {
// can't use _child_readers[*_read_column_names.begin()]
// because the operator[] of std::unordered_map is not const :(
- return
_child_readers.find(*_read_column_names.begin())->second->get_rep_level();
+ /*
+ * Considering the issue in the `_read_nested_column` function
where data may span across pages, leading
+ * to missing definition and repetition levels, when filling the
null_map of the struct later, it is
+ * crucial to use the definition and repetition levels from the
first read column,
+ * that is `_read_column_names.front()`.
+ */
+ return
_child_readers.find(_read_column_names.front())->second->get_rep_level();
}
return _child_readers.begin()->second->get_rep_level();
}
const std::vector<level_t>& get_def_level() const override {
if (!_read_column_names.empty()) {
- return
_child_readers.find(*_read_column_names.begin())->second->get_def_level();
+ return
_child_readers.find(_read_column_names.front())->second->get_def_level();
}
return _child_readers.begin()->second->get_def_level();
}
Statistics statistics() override {
Statistics st;
- for (const auto& reader : _child_readers) {
- // make sure the field is read
- if (_read_column_names.find(reader.first) !=
_read_column_names.end()) {
- Statistics cst = reader.second->statistics();
+ for (const auto& column_name : _read_column_names) {
+ auto reader = _child_readers.find(column_name);
+ if (reader != _child_readers.end()) {
+ Statistics cst = reader->second->statistics();
st.merge(cst);
}
}
@@ -308,7 +314,8 @@ public:
private:
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_child_readers;
- std::set<std::string> _read_column_names;
+ std::vector<std::string> _read_column_names;
+ //Need to use vector instead of set,see `get_rep_level()` for the reason.
};
}; // namespace doris::vectorized
diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_p2.out
b/regression-test/data/external_table_p0/tvf/test_tvf_p0.out
similarity index 98%
rename from regression-test/data/external_table_p0/tvf/test_tvf_p2.out
rename to regression-test/data/external_table_p0/tvf/test_tvf_p0.out
index 53b454df858..5ec7cc860da 100644
Binary files a/regression-test/data/external_table_p0/tvf/test_tvf_p2.out and
b/regression-test/data/external_table_p0/tvf/test_tvf_p0.out differ
diff --git
a/regression-test/data/external_table_p2/hive/test_parquet_complex_cross_page.out
b/regression-test/data/external_table_p2/hive/test_parquet_complex_cross_page.out
new file mode 100644
index 00000000000..e68420444ba
Binary files /dev/null and
b/regression-test/data/external_table_p2/hive/test_parquet_complex_cross_page.out
differ
diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_p2.groovy
b/regression-test/suites/external_table_p0/tvf/test_tvf_p0.groovy
similarity index 78%
rename from regression-test/suites/external_table_p0/tvf/test_tvf_p2.groovy
rename to regression-test/suites/external_table_p0/tvf/test_tvf_p0.groovy
index f68fe55e859..990ef03cc50 100644
--- a/regression-test/suites/external_table_p0/tvf/test_tvf_p2.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_tvf_p0.groovy
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
+suite("test_tvf_p0", "p0,external,tvf,external_docker,hive") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String nameNodeHost = context.config.otherConfigs.get("externalEnvIp")
@@ -46,7 +46,7 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
"format" = "orc");
"""
- // a row of complex type may be stored across more pages
+ // (1): a row of complex type may be stored across more pages
qt_row_cross_pages """select count(id), count(m1), count(m2)
from hdfs(
"uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet",
@@ -73,5 +73,25 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive")
{
"format" = "parquet",
"fs.viewfs.mounttable.my-cluster.link./ns1" =
"hdfs://${nameNodeHost}:${hdfsPort}/",
"fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")"""
+
+ // (2): a row of complex type may be stored across more pages
+ qt_row_cross_pages_2 """select count(id), count(experiment)
+ from hdfs(
+ "uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
+ "format" = "parquet");
+ """ //149923
+
+ qt_row_cross_pages_3 """select count(id), count(experiment)
+ from hdfs(
+ "uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
+ "format" = "parquet") where id > 49923 ;
+ """ // 74815
+
+ qt_row_cross_pages_4 """select count(id), count(experiment)
+ from hdfs(
+ "uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
+ "format" = "parquet") where id < 300 ;
+ """ //457
+
}
}
diff --git
a/regression-test/suites/external_table_p2/hive/test_parquet_complex_cross_page.groovy
b/regression-test/suites/external_table_p2/hive/test_parquet_complex_cross_page.groovy
new file mode 100644
index 00000000000..685f5f3204d
--- /dev/null
+++
b/regression-test/suites/external_table_p2/hive/test_parquet_complex_cross_page.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_parquet_complex_cross_page",
"p2,external,hive,external_remote,external_remote_hive") {
+
+ String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+ //hudi hive use same catalog in p2.
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable test")
+ return;
+ }
+
+ String props = context.config.otherConfigs.get("hudiEmrCatalog")
+ String hms_catalog_name = "test_parquet_complex_cross_page"
+
+ sql """drop catalog if exists ${hms_catalog_name};"""
+ sql """
+ CREATE CATALOG IF NOT EXISTS ${hms_catalog_name}
+ PROPERTIES (
+ ${props}
+ ,'hive.version' = '3.1.3'
+ );
+ """
+
+ logger.info("catalog " + hms_catalog_name + " created")
+ sql """switch ${hms_catalog_name};"""
+ logger.info("switched to catalog " + hms_catalog_name)
+ sql """ use regression;"""
+
+ sql """ set dry_run_query=true; """
+
+ qt_1 """ SELECT * FROM test_parquet_complex_cross_page WHERE
device_id='DZ692' and format_time between 1737693770300 and 1737693770500
+ and date between '20250124' and '20250124' and project='GA20230001' ; """
+ qt_2 """ SELECT functions_pnc_ssm_road_di_objects from
test_parquet_complex_cross_page ; """
+ qt_3 """ select * from test_parquet_complex_cross_page ; """
+
+ sql """drop catalog ${hms_catalog_name};"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]