This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new caf6efd9363 [Pick](branch-2.0) pick two PRs to fix memtable shrink
#28536 #28660 (#28671)
caf6efd9363 is described below
commit caf6efd936351187f54828bf7d834dce005cc8bb
Author: lihangyu <[email protected]>
AuthorDate: Wed Dec 20 11:47:49 2023 +0800
[Pick](branch-2.0) pick two PRs to fix memtable shrink #28536 #28660
(#28671)
* [Fix](memtable) fix `shrink_memtable_by_agg` should also update
`_row_in_blocks` (#28536)
* [Fix](memtable) fix `shrink_memtable_by_agg` without duplicated keys
(#28660)
---
be/src/common/config.cpp | 2 +-
be/src/common/config.h | 2 +-
be/src/olap/memtable.cpp | 7 +-
.../test_insert_with_aggregation_memtable.out | 17 +++
.../test_insert_with_aggregation_memtable.groovy | 141 +++++++++++++++++++++
5 files changed, 163 insertions(+), 6 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index dafc613d77c..40f5a2b6b66 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1030,7 +1030,7 @@ DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
//disable shrink memory by default
-DEFINE_Bool(enable_shrink_memory, "false");
+DEFINE_mBool(enable_shrink_memory, "false");
DEFINE_mInt32(schema_cache_capacity, "1024");
DEFINE_mInt32(schema_cache_sweep_time_sec, "100");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 65b9de8f12e..7d7119ecfc1 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1067,7 +1067,7 @@ DECLARE_mInt32(s3_write_buffer_whole_size);
// the max number of cached file handle for block segemnt
DECLARE_mInt64(file_cache_max_file_reader_cache_size);
//enable shrink memory
-DECLARE_Bool(enable_shrink_memory);
+DECLARE_mBool(enable_shrink_memory);
// enable cache for high concurrent point query work load
DECLARE_mInt32(schema_cache_capacity);
DECLARE_mInt32(schema_cache_sweep_time_sec);
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 880609b053f..ea3c0fcf6b6 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -425,6 +425,8 @@ void MemTable::_aggregate() {
_output_mutable_block =
vectorized::MutableBlock::build_mutable_block(empty_input_block.get());
_output_mutable_block.clear_column_data();
+ _row_in_blocks = temp_row_in_blocks;
+ _last_sorted_pos = _row_in_blocks.size();
}
}
@@ -434,10 +436,7 @@ void MemTable::shrink_memtable_by_agg() {
return;
}
size_t same_keys_num = _sort();
- if (same_keys_num == 0) {
- vectorized::Block in_block = _input_mutable_block.to_block();
- _put_into_output(in_block);
- } else {
+ if (same_keys_num != 0) {
_aggregate<false>();
}
}
diff --git
a/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out
b/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out
new file mode 100644
index 00000000000..030677444b3
--- /dev/null
+++
b/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+10000 2017-10-01 北京 20 0 2017-10-01T07:00 35
10 2
+10001 2017-10-01 北京 30 1 2017-10-01T17:05:45 2
22 22
+10002 2017-10-02 上海 20 1 2017-10-02T12:59:12 200
5 5
+10003 2017-10-02 广州 32 0 2017-10-02T11:20 30
11 11
+10004 2017-10-01 深圳 35 0 2017-10-01T10:00:15 100
3 3
+10004 2017-10-03 深圳 35 0 2017-10-03T10:20:22 11
6 6
+
+-- !sql --
+10000 2017-10-01 北京 20 0 2017-10-01T07:00 35
10 2
+10001 2017-10-01 北京 30 1 2017-10-01T17:05:45 2
22 22
+10002 2017-10-02 上海 20 1 2017-10-02T12:59:12 200
5 5
+10003 2017-10-02 广州 32 0 2017-10-02T11:20 30
11 11
+10004 2017-10-01 深圳 35 0 2017-10-01T10:00:15 100
3 3
+10004 2017-10-03 深圳 35 0 2017-10-03T10:20:22 11
6 6
+
diff --git
a/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy
b/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy
new file mode 100644
index 00000000000..bbfca8fa5f6
--- /dev/null
+++
b/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_with_aggregation_memtable", "nonConcurrent") {
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string:[:]]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_param = { paramName, paramValue ->
+ // for eache be node, set paramName=paramValue
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def reset_be_param = { paramName ->
+ // for eache be node, reset paramName to default
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def original_value = backendId_to_params.get(id).get(paramName)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
original_value))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def get_be_param = { paramName ->
+ // for eache be node, get param value by default
+ def paramValue = ""
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
+ assertTrue(code == 0)
+ assertTrue(out.contains(paramName))
+ // parsing
+ def resultList = parseJson(out)[0]
+ assertTrue(resultList.size() == 4)
+ // get original value
+ paramValue = resultList[2]
+ backendId_to_params.get(id, [:]).put(paramName, paramValue)
+ }
+ }
+
+ def testTable = "test_memtable_enable_with_aggregate"
+ sql """ DROP TABLE IF EXISTS ${testTable}"""
+ def testTableDDL = """
+ create table ${testTable}
+ (
+ `id` LARGEINT NOT NULL,
+ `k1` DATE NOT NULL,
+ `k2` VARCHAR(20),
+ `k3` SMALLINT,
+ `k4` TINYINT,
+ `k5` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
+ `k6` BIGINT SUM DEFAULT "0",
+ `k7` INT MAX DEFAULT "0",
+ `k8` INT MIN DEFAULT "99999"
+ )
+ AGGREGATE KEY(`id`, `k1`, `k2`, `k3`, `k4`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ def insert_sql = """
+ insert into ${testTable} values
+ (10000,"2017-10-01","北京",20,0,"2017-10-01 06:00:00",20,10,10),
+ (10000,"2017-10-01","北京",20,0,"2017-10-01 07:00:00",15,2,2),
+ (10001,"2017-10-01","北京",30,1,"2017-10-01 17:05:45",2,22,22),
+ (10002,"2017-10-02","上海",20,1,"2017-10-02 12:59:12",200,5,5),
+ (10003,"2017-10-02","广州",32,0,"2017-10-02 11:20:00",30,11,11),
+ (10004,"2017-10-01","深圳",35,0,"2017-10-01 10:00:15",100,3,3),
+ (10004,"2017-10-03","深圳",35,0,"2017-10-03 10:20:22",11,6,6);
+ """
+
+ sql testTableDDL
+ sql "sync"
+ sql insert_sql
+ sql "sync"
+ qt_sql "select * from ${testTable} order by id asc"
+
+ // store the original value
+ get_be_param("enable_shrink_memory")
+ get_be_param("write_buffer_size_for_agg")
+
+ // the original value is false
+ set_be_param("enable_shrink_memory", "true")
+ // the original value is 400MB
+ set_be_param("write_buffer_size_for_agg", "512") // change it to 0.5KB
+ sql """ DROP TABLE IF EXISTS ${testTable}"""
+ sql testTableDDL
+ sql "sync"
+ sql insert_sql
+ sql "sync"
+ qt_sql "select * from ${testTable} order by id asc"
+
+ // test with mv
+ def table_name = "agg_shrink"
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v text
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 4
+ properties("replication_num" = "1");
+ """
+ set_be_param("write_buffer_size_for_agg", "10240") // change it to 10KB
+ sql """INSERT INTO ${table_name} SELECT *, '{"k1":1, "k2": "hello world",
"k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" =
"4096")"""
+ sql """INSERT INTO ${table_name} SELECT k, v from ${table_name}"""
+ sql """INSERT INTO ${table_name} SELECT k, v from ${table_name}"""
+ createMV("""create materialized view var_cnt as select k, count(k) from
${table_name} group by k""")
+ sql """INSERT INTO ${table_name} SELECT k, v from ${table_name} limit
8101"""
+ // insert with no duplicate
+ sql """INSERT INTO ${table_name} SELECT *, '{"k1":1, "k2": "hello world",
"k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" =
"4096"); """
+
+ reset_be_param("enable_shrink_memory")
+ reset_be_param("write_buffer_size_for_agg")
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]