This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 66e28778831cbf9bb85154ffab61b0acdeb2a527 Author: Kang <[email protected]> AuthorDate: Sat Sep 16 23:20:40 2023 +0800 Revert "[fix](agg) windown_funnel compatibility issue with multi backends (#24385)" This reverts commit b923eb0771f412d594efde601afb22d5df4eb763. --- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + .../aggregate_function_window_funnel.h | 26 ++---- .../data/query_p0/aggregate/window_funnel.out | 3 - .../nereids_p0/aggregate/window_funnel.groovy | 35 ++------ .../suites/query_p0/aggregate/window_funnel.groovy | 100 ++------------------- 6 files changed, 28 insertions(+), 142 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 8eb278bf82..e13195396a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1058,6 +1058,9 @@ DEFINE_mInt64(lookup_connection_cache_bytes_limit, "4294967296"); // level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT DEFINE_mInt64(LZ4_HC_compression_level, "9"); +// enable window_funnel_function with different modes +DEFINE_mBool(enable_window_funnel_function_v2, "false"); + DEFINE_Bool(enable_hdfs_hedged_read, "false"); DEFINE_Int32(hdfs_hedged_read_thread_num, "128"); DEFINE_Int32(hdfs_hedged_read_threshold_time, "500"); diff --git a/be/src/common/config.h b/be/src/common/config.h index dbf6734aa6..ab74bf7503 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1104,6 +1104,9 @@ DECLARE_mInt64(lookup_connection_cache_bytes_limit); // level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT DECLARE_mInt64(LZ4_HC_compression_level); +// enable window_funnel_function with different modes +DECLARE_mBool(enable_window_funnel_function_v2); + // whether to enable hdfs hedged read. // If set to true, it will be enabled even if user not enable it when creating catalog DECLARE_Bool(enable_hdfs_hedged_read); diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h index 253677bbc3..a69e5d0cd6 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h @@ -78,7 +78,6 @@ struct WindowFunnelState { bool sorted; int64_t window; WindowFunnelMode window_funnel_mode; - bool enable_mode; WindowFunnelState() { sorted = true; @@ -98,7 +97,7 @@ struct WindowFunnelState { WindowFunnelMode mode) { window = win; max_event_level = event_num; - window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT; + window_funnel_mode = mode; if (sorted && events.size() > 0) { if (events.back().first == timestamp) { @@ -204,20 +203,16 @@ struct WindowFunnelState { std::inplace_merge(begin, middle, end); max_event_level = max_event_level > 0 ? max_event_level : other.max_event_level; window = window > 0 ? window : other.window; - if (enable_mode) { - window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID - ? other.window_funnel_mode - : window_funnel_mode; - } else { - window_funnel_mode = WindowFunnelMode::DEFAULT; - } + window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID + ? other.window_funnel_mode + : window_funnel_mode; sorted = true; } void write(BufferWritable& out) const { write_var_int(max_event_level, out); write_var_int(window, out); - if (enable_mode) { + if (config::enable_window_funnel_function_v2) { write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode), out); } @@ -237,7 +232,7 @@ struct WindowFunnelState { max_event_level = (int)event_level; read_var_int(window, in); window_funnel_mode = WindowFunnelMode::DEFAULT; - if (enable_mode) { + if (config::enable_window_funnel_function_v2) { int64_t mode; read_var_int(mode, in); window_funnel_mode = static_cast<WindowFunnelMode>(mode); @@ -267,12 +262,6 @@ public: WindowFunnelState<DateValueType, NativeType>, AggregateFunctionWindowFunnel<DateValueType, NativeType>>(argument_types_) {} - void create(AggregateDataPtr __restrict place) const override { - auto data = new (place) WindowFunnelState<DateValueType, NativeType>(); - /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version` - data->enable_mode = version >= 3; - } - String get_name() const override { return "window_funnel"; } DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt32>(); } @@ -321,9 +310,6 @@ public: AggregateFunctionWindowFunnel<DateValueType, NativeType>>::data(place) .get()); } - -protected: - using IAggregateFunction::version; }; } // namespace doris::vectorized diff --git a/regression-test/data/query_p0/aggregate/window_funnel.out b/regression-test/data/query_p0/aggregate/window_funnel.out index fa239a9e71..3396dd90e8 100644 --- a/regression-test/data/query_p0/aggregate/window_funnel.out +++ b/regression-test/data/query_p0/aggregate/window_funnel.out @@ -11,9 +11,6 @@ -- !window_funnel -- 2 --- !window_funnel_deduplication_compat -- -4 - -- !window_funnel_deduplication -- 2 diff --git a/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy b/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy index 02562c49f4..6ad68bd2a3 100644 --- a/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy +++ b/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy @@ -19,8 +19,6 @@ // /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate // and modified by Doris. -import org.codehaus.groovy.runtime.IOGroovyMethods - suite("window_funnel") { sql "SET enable_nereids_planner=true" sql "SET enable_fallback_to_original_planner=false" @@ -107,31 +105,14 @@ suite("window_funnel") { """ sql """ DROP TABLE IF EXISTS ${tableName} """ - StringBuilder strBuilder = new StringBuilder() - strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) - strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe?conf_item=be_exec_version") - - String command = strBuilder.toString() - def process = command.toString().execute() - def code = process.waitFor() - def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); - def out = process.getText() - logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def response = parseJson(out.trim()) - assertEquals(response.code, 0) - assertEquals(response.msg, "success") - def configJson = response.data.rows - def beExecVersion = 0 - for (Object conf: configJson) { - assert conf instanceof Map - if (((Map<String, String>) conf).get("Name").toLowerCase() == "be_exec_version") { - beExecVersion = ((Map<String, String>) conf).get("Value").toInteger() - } - } - if (beExecVersion < 3) { - logger.warn("Be exec version(${beExecVersion}) is less than 3, skip window_funnel mode test") - return + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + for (String backendId in backendId_to_backendIP.keySet()) { + String be_host = backendId_to_backendIP[backendId] + String be_http_port = backendId_to_backendHttpPort[backendId] + curl("POST", "http://${be_host}:${be_http_port}/api/update_config?enable_window_funnel_function_v2=true") } sql """ DROP TABLE IF EXISTS ${tableName} """ diff --git a/regression-test/suites/query_p0/aggregate/window_funnel.groovy b/regression-test/suites/query_p0/aggregate/window_funnel.groovy index edefcbc939..1fc4cf555b 100644 --- a/regression-test/suites/query_p0/aggregate/window_funnel.groovy +++ b/regression-test/suites/query_p0/aggregate/window_funnel.groovy @@ -19,8 +19,6 @@ // /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate // and modified by Doris. -import org.codehaus.groovy.runtime.IOGroovyMethods - suite("window_funnel") { sql "SET enable_nereids_planner=false" def tableName = "windowfunnel_test" @@ -106,96 +104,14 @@ suite("window_funnel") { """ sql """ DROP TABLE IF EXISTS ${tableName} """ - StringBuilder strBuilder = new StringBuilder() - strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) - strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe?conf_item=be_exec_version") - - def command = strBuilder.toString() - def process = command.toString().execute() - def code = process.waitFor() - def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); - def out = process.getText() - logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def response = parseJson(out.trim()) - assertEquals(response.code, 0) - assertEquals(response.msg, "success") - def configJson = response.data.rows - def beExecVersion = 0 - for (Object conf: configJson) { - assert conf instanceof Map - if (((Map<String, String>) conf).get("Name").toLowerCase() == "be_exec_version") { - beExecVersion = ((Map<String, String>) conf).get("Value").toInteger() - } - } - if (beExecVersion < 3) { - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - xwho varchar(50) NULL COMMENT 'xwho', - xwhen datetimev2(3) COMMENT 'xwhen', - xwhat int NULL COMMENT 'xwhat' - ) - DUPLICATE KEY(xwho) - DISTRIBUTED BY HASH(xwho) BUCKETS 3 - PROPERTIES ( - "replication_num" = "1" - ); - """ - sql "INSERT into ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:41:00.111111', 1)" - sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:02.111111', 2)" - sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:03.111111', 2)" - sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 14:15:01.111111', 3)" - sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 15:05:04.111111', 4)" - qt_window_funnel_deduplication_compat """ - select - window_funnel( - 20000, - 'deduplication', - t.xwhen, - t.xwhat = 1, - t.xwhat = 2, - t.xwhat = 3, - t.xwhat = 4 - ) AS level - from ${tableName} t; - """ - sql """ DROP TABLE IF EXISTS ${tableName} """ - logger.warn("Be exec version(${beExecVersion}) is less than 3, skip window_funnel mode test") - return - } else { - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - xwho varchar(50) NULL COMMENT 'xwho', - xwhen datetimev2(3) COMMENT 'xwhen', - xwhat int NULL COMMENT 'xwhat' - ) - DUPLICATE KEY(xwho) - DISTRIBUTED BY HASH(xwho) BUCKETS 3 - PROPERTIES ( - "replication_num" = "1" - ); - """ - sql "INSERT into ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:41:00.111111', 1)" - sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:02.111111', 2)" - sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:03.111111', 2)" - sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 14:15:01.111111', 3)" - sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 15:05:04.111111', 4)" - qt_window_funnel_deduplication_compat """ - select - window_funnel( - 20000, - 'default', - t.xwhen, - t.xwhat = 1, - t.xwhat = 2, - t.xwhat = 3, - t.xwhat = 4 - ) AS level - from ${tableName} t; - """ - sql """ DROP TABLE IF EXISTS ${tableName} """ + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + for (String backendId in backendId_to_backendIP.keySet()) { + String be_host = backendId_to_backendIP[backendId] + String be_http_port = backendId_to_backendHttpPort[backendId] + curl("POST", "http://${be_host}:${be_http_port}/api/update_config?enable_window_funnel_function_v2=true") } sql """ DROP TABLE IF EXISTS ${tableName} """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
