This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a80adab4971 [FIX](inverted index)fix for array inverted index writer
with large dataset witch will make core (#34076)
a80adab4971 is described below
commit a80adab497179baaf779694c282d09fcfe20ddad
Author: amory <[email protected]>
AuthorDate: Sat Apr 27 12:04:21 2024 +0800
[FIX](inverted index)fix for array inverted index writer with large dataset
witch will make core (#34076)
* fix for array inverted index writer with large dataset witch will make
core
* add cases
* change p1 to p2
* updated
---
.../rowset/segment_v2/inverted_index_writer.cpp | 21 +++--
.../test_array_with_large_dataset.out | 4 +
.../test_array_with_large_dataset.groovy | 95 ++++++++++++++++++++++
3 files changed, 111 insertions(+), 9 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index e9956008f98..7774dc0c1dd 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -363,16 +363,15 @@ public:
LOG(ERROR) << "index writer is null in inverted index writer.";
return Status::InternalError("index writer is null in inverted
index writer");
}
+ size_t start_off = 0;
for (int i = 0; i < count; ++i) {
- // offsets[i+1] is now row element count
- // [0, 3, 6]
- // [10,20,30] [20,30,40], [30,40,50]
- auto start_off = offsets[i];
- auto end_off = offsets[i + 1];
+ // nullmap & value ptr-array may not from offsets[i] because
olap_convertor make offsets accumulate from _base_offset which may not is 0,
but nullmap & value in this segment is from 0, we only need
+ // every single array row element size to go through the
nullmap & value ptr-array, and also can go through the every row in array to
keep with _rid++
+ auto array_elem_size = offsets[i + 1] - offsets[i];
// TODO(Amory).later we use object pool to avoid field creation
lucene::document::Field* new_field = nullptr;
CL_NS(analysis)::TokenStream* ts = nullptr;
- for (auto j = start_off; j < end_off; ++j) {
+ for (auto j = start_off; j < start_off + array_elem_size; ++j)
{
if (null_map[j] == 1) {
continue;
}
@@ -405,19 +404,22 @@ public:
_doc->add(*new_field);
}
}
+ start_off += array_elem_size;
if (!_doc->getFields()->empty()) {
// if this array is null, we just ignore to write inverted
index
RETURN_IF_ERROR(add_document());
_doc->clear();
_CLDELETE(ts);
+ } else {
+ RETURN_IF_ERROR(add_null_document());
}
_rid++;
}
} else if constexpr (field_is_numeric_type(field_type)) {
+ size_t start_off = 0;
for (int i = 0; i < count; ++i) {
- auto start_off = offsets[i];
- auto end_off = offsets[i + 1];
- for (size_t j = start_off; j < end_off; ++j) {
+ auto array_elem_size = offsets[i + 1] - offsets[i];
+ for (size_t j = start_off; j < start_off + array_elem_size;
++j) {
if (null_map[j] == 1) {
continue;
}
@@ -428,6 +430,7 @@ public:
_value_key_coder->full_encode_ascending(p, &new_value);
_bkd_writer->add((const uint8_t*)new_value.c_str(),
value_length, _rid);
}
+ start_off += array_elem_size;
_row_ids_seen_for_bkd++;
_rid++;
}
diff --git
a/regression-test/data/inverted_index_p2/test_array_with_large_dataset.out
b/regression-test/data/inverted_index_p2/test_array_with_large_dataset.out
new file mode 100644
index 00000000000..3635d2aa6f8
--- /dev/null
+++ b/regression-test/data/inverted_index_p2/test_array_with_large_dataset.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+2499995
+
diff --git
a/regression-test/suites/inverted_index_p2/test_array_with_large_dataset.groovy
b/regression-test/suites/inverted_index_p2/test_array_with_large_dataset.groovy
new file mode 100644
index 00000000000..4d2f53d5353
--- /dev/null
+++
b/regression-test/suites/inverted_index_p2/test_array_with_large_dataset.groovy
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_array_with_large_dataset", "p2"){
+
+ def StreamLoad = {tableName, fileName ->
+ streamLoad {
+ // you can skip db declaration, because a default db has already
been
+ // specified in ${DORIS_HOME}/conf/regression-conf.groovy
+ // db 'regression_test'
+ table tableName
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually
is '\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'max_filter_ratio', '0.3'
+ set 'compress_type', 'GZ'
+
+ // relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+ // also, you can stream load a http stream, e.g.
http://xxx/some.csv
+ file fileName
+ time 300000
+ // stream load action will check result, include Success status,
and NumberTotalRows == NumberLoadedRows
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ // create table
+ sql """ DROP TABLE IF EXISTS bai;"""
+ sql """
+ CREATE TABLE `bai` (
+ `id` BIGINT NULL,
+ `asl` ARRAY<INT> NULL,
+ `ash` ARRAY<INT> NULL,
+ INDEX index_inverted_ail (`asl`) USING INVERTED COMMENT '''''',
+ INDEX index_inverted_aih (`ash`) USING INVERTED COMMENT ''''''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 16
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "min_load_replica_num" = "-1",
+ "is_being_synced" = "false",
+ "storage_medium" = "hdd",
+ "storage_format" = "V2",
+ "inverted_index_storage_format" = "V1",
+ "light_schema_change" = "true",
+ "disable_auto_compaction" = "false",
+ "enable_single_replica_compaction" = "false",
+ "group_commit_interval_ms" = "10000",
+ "group_commit_data_bytes" = "134217728"
+ );
+ """
+
+ def array_files = ["array_int_1.tar.gz", "array_int_500001.tar.gz",
"array_int_1000001.tar.gz", "array_int_1500001.tar.gz",
"array_int_2000001.tar.gz"]
+ for (f in array_files) {
+ def file_name = "${getS3Url()}/regression/array_index/" + f
+ StreamLoad.call("bai", file_name)
+ }
+ sql """sync"""
+
+ // check result
+ qt_select "SELECT count(*) FROM bai;"
+
+ // download tar.gz file for
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]