This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 66e28778831cbf9bb85154ffab61b0acdeb2a527
Author: Kang <[email protected]>
AuthorDate: Sat Sep 16 23:20:40 2023 +0800

    Revert "[fix](agg) windown_funnel compatibility issue with multi backends 
(#24385)"
    
    This reverts commit b923eb0771f412d594efde601afb22d5df4eb763.
---
 be/src/common/config.cpp                           |   3 +
 be/src/common/config.h                             |   3 +
 .../aggregate_function_window_funnel.h             |  26 ++----
 .../data/query_p0/aggregate/window_funnel.out      |   3 -
 .../nereids_p0/aggregate/window_funnel.groovy      |  35 ++------
 .../suites/query_p0/aggregate/window_funnel.groovy | 100 ++-------------------
 6 files changed, 28 insertions(+), 142 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8eb278bf82..e13195396a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1058,6 +1058,9 @@ DEFINE_mInt64(lookup_connection_cache_bytes_limit, 
"4294967296");
 // level of compression when using LZ4_HC, whose defalut value is 
LZ4HC_CLEVEL_DEFAULT
 DEFINE_mInt64(LZ4_HC_compression_level, "9");
 
+// enable window_funnel_function with different modes
+DEFINE_mBool(enable_window_funnel_function_v2, "false");
+
 DEFINE_Bool(enable_hdfs_hedged_read, "false");
 DEFINE_Int32(hdfs_hedged_read_thread_num, "128");
 DEFINE_Int32(hdfs_hedged_read_threshold_time, "500");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index dbf6734aa6..ab74bf7503 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1104,6 +1104,9 @@ DECLARE_mInt64(lookup_connection_cache_bytes_limit);
 // level of compression when using LZ4_HC, whose defalut value is 
LZ4HC_CLEVEL_DEFAULT
 DECLARE_mInt64(LZ4_HC_compression_level);
 
+// enable window_funnel_function with different modes
+DECLARE_mBool(enable_window_funnel_function_v2);
+
 // whether to enable hdfs hedged read.
 // If set to true, it will be enabled even if user not enable it when creating 
catalog
 DECLARE_Bool(enable_hdfs_hedged_read);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h 
b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
index 253677bbc3..a69e5d0cd6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
@@ -78,7 +78,6 @@ struct WindowFunnelState {
     bool sorted;
     int64_t window;
     WindowFunnelMode window_funnel_mode;
-    bool enable_mode;
 
     WindowFunnelState() {
         sorted = true;
@@ -98,7 +97,7 @@ struct WindowFunnelState {
              WindowFunnelMode mode) {
         window = win;
         max_event_level = event_num;
-        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
+        window_funnel_mode = mode;
 
         if (sorted && events.size() > 0) {
             if (events.back().first == timestamp) {
@@ -204,20 +203,16 @@ struct WindowFunnelState {
         std::inplace_merge(begin, middle, end);
         max_event_level = max_event_level > 0 ? max_event_level : 
other.max_event_level;
         window = window > 0 ? window : other.window;
-        if (enable_mode) {
-            window_funnel_mode = window_funnel_mode == 
WindowFunnelMode::INVALID
-                                         ? other.window_funnel_mode
-                                         : window_funnel_mode;
-        } else {
-            window_funnel_mode = WindowFunnelMode::DEFAULT;
-        }
+        window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
+                                     ? other.window_funnel_mode
+                                     : window_funnel_mode;
         sorted = true;
     }
 
     void write(BufferWritable& out) const {
         write_var_int(max_event_level, out);
         write_var_int(window, out);
-        if (enable_mode) {
+        if (config::enable_window_funnel_function_v2) {
             
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
                           out);
         }
@@ -237,7 +232,7 @@ struct WindowFunnelState {
         max_event_level = (int)event_level;
         read_var_int(window, in);
         window_funnel_mode = WindowFunnelMode::DEFAULT;
-        if (enable_mode) {
+        if (config::enable_window_funnel_function_v2) {
             int64_t mode;
             read_var_int(mode, in);
             window_funnel_mode = static_cast<WindowFunnelMode>(mode);
@@ -267,12 +262,6 @@ public:
                       WindowFunnelState<DateValueType, NativeType>,
                       AggregateFunctionWindowFunnel<DateValueType, 
NativeType>>(argument_types_) {}
 
-    void create(AggregateDataPtr __restrict place) const override {
-        auto data = new (place) WindowFunnelState<DateValueType, NativeType>();
-        /// support window funnel mode from 2.0. See 
`BeExecVersionManager::max_be_exec_version`
-        data->enable_mode = version >= 3;
-    }
-
     String get_name() const override { return "window_funnel"; }
 
     DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeInt32>(); }
@@ -321,9 +310,6 @@ public:
                         AggregateFunctionWindowFunnel<DateValueType, 
NativeType>>::data(place)
                         .get());
     }
-
-protected:
-    using IAggregateFunction::version;
 };
 
 } // namespace doris::vectorized
diff --git a/regression-test/data/query_p0/aggregate/window_funnel.out 
b/regression-test/data/query_p0/aggregate/window_funnel.out
index fa239a9e71..3396dd90e8 100644
--- a/regression-test/data/query_p0/aggregate/window_funnel.out
+++ b/regression-test/data/query_p0/aggregate/window_funnel.out
@@ -11,9 +11,6 @@
 -- !window_funnel --
 2
 
--- !window_funnel_deduplication_compat --
-4
-
 -- !window_funnel_deduplication --
 2
 
diff --git a/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy 
b/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy
index 02562c49f4..6ad68bd2a3 100644
--- a/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy
+++ b/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy
@@ -19,8 +19,6 @@
 // 
/testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
 // and modified by Doris.
 
-import org.codehaus.groovy.runtime.IOGroovyMethods
-
 suite("window_funnel") {
     sql "SET enable_nereids_planner=true"
     sql "SET enable_fallback_to_original_planner=false"
@@ -107,31 +105,14 @@ suite("window_funnel") {
     """
     sql """ DROP TABLE IF EXISTS ${tableName} """
 
-    StringBuilder strBuilder = new StringBuilder()
-    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
-    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe?conf_item=be_exec_version")
-
-    String command = strBuilder.toString()
-    def process = command.toString().execute()
-    def code = process.waitFor()
-    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
-    def out = process.getText()
-    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
-    assertEquals(code, 0)
-    def response = parseJson(out.trim())
-    assertEquals(response.code, 0)
-    assertEquals(response.msg, "success")
-    def configJson = response.data.rows
-    def beExecVersion = 0
-    for (Object conf: configJson) {
-        assert conf instanceof Map
-        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"be_exec_version") {
-            beExecVersion = ((Map<String, String>) 
conf).get("Value").toInteger()
-        }
-    }
-    if (beExecVersion < 3) {
-        logger.warn("Be exec version(${beExecVersion}) is less than 3, skip 
window_funnel mode test")
-        return
+    String backend_id;
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+    for (String backendId in backendId_to_backendIP.keySet()) {
+        String be_host = backendId_to_backendIP[backendId]
+        String be_http_port = backendId_to_backendHttpPort[backendId]
+        curl("POST", 
"http://${be_host}:${be_http_port}/api/update_config?enable_window_funnel_function_v2=true";)
     }
 
     sql """ DROP TABLE IF EXISTS ${tableName} """
diff --git a/regression-test/suites/query_p0/aggregate/window_funnel.groovy 
b/regression-test/suites/query_p0/aggregate/window_funnel.groovy
index edefcbc939..1fc4cf555b 100644
--- a/regression-test/suites/query_p0/aggregate/window_funnel.groovy
+++ b/regression-test/suites/query_p0/aggregate/window_funnel.groovy
@@ -19,8 +19,6 @@
 // 
/testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
 // and modified by Doris.
 
-import org.codehaus.groovy.runtime.IOGroovyMethods
-
 suite("window_funnel") {
     sql "SET enable_nereids_planner=false"
     def tableName = "windowfunnel_test"
@@ -106,96 +104,14 @@ suite("window_funnel") {
     """
     sql """ DROP TABLE IF EXISTS ${tableName} """
 
-    StringBuilder strBuilder = new StringBuilder()
-    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
-    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe?conf_item=be_exec_version")
-
-    def command = strBuilder.toString()
-    def process = command.toString().execute()
-    def code = process.waitFor()
-    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
-    def out = process.getText()
-    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
-    assertEquals(code, 0)
-    def response = parseJson(out.trim())
-    assertEquals(response.code, 0)
-    assertEquals(response.msg, "success")
-    def configJson = response.data.rows
-    def beExecVersion = 0
-    for (Object conf: configJson) {
-        assert conf instanceof Map
-        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"be_exec_version") {
-            beExecVersion = ((Map<String, String>) 
conf).get("Value").toInteger()
-        }
-    }
-    if (beExecVersion < 3) {
-        sql """ DROP TABLE IF EXISTS ${tableName} """
-        sql """
-            CREATE TABLE IF NOT EXISTS ${tableName} (
-                xwho varchar(50) NULL COMMENT 'xwho',
-                xwhen datetimev2(3) COMMENT 'xwhen',
-                xwhat int NULL COMMENT 'xwhat'
-            )
-            DUPLICATE KEY(xwho)
-            DISTRIBUTED BY HASH(xwho) BUCKETS 3
-            PROPERTIES (
-            "replication_num" = "1"
-            );
-        """
-        sql "INSERT into ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00.111111', 1)"
-        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 2)"
-        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:03.111111', 2)"
-        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 14:15:01.111111', 3)"
-        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 15:05:04.111111', 4)"
-        qt_window_funnel_deduplication_compat """
-            select
-                window_funnel(
-                    20000,
-                    'deduplication',
-                    t.xwhen,
-                    t.xwhat = 1,
-                    t.xwhat = 2,
-                    t.xwhat = 3,
-                    t.xwhat = 4
-                    ) AS level
-            from ${tableName} t;
-        """
-        sql """ DROP TABLE IF EXISTS ${tableName} """
-        logger.warn("Be exec version(${beExecVersion}) is less than 3, skip 
window_funnel mode test")
-        return
-    } else {
-        sql """ DROP TABLE IF EXISTS ${tableName} """
-        sql """
-            CREATE TABLE IF NOT EXISTS ${tableName} (
-                xwho varchar(50) NULL COMMENT 'xwho',
-                xwhen datetimev2(3) COMMENT 'xwhen',
-                xwhat int NULL COMMENT 'xwhat'
-            )
-            DUPLICATE KEY(xwho)
-            DISTRIBUTED BY HASH(xwho) BUCKETS 3
-            PROPERTIES (
-            "replication_num" = "1"
-            );
-        """
-        sql "INSERT into ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00.111111', 1)"
-        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 2)"
-        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:03.111111', 2)"
-        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 14:15:01.111111', 3)"
-        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 15:05:04.111111', 4)"
-        qt_window_funnel_deduplication_compat """
-            select
-                window_funnel(
-                    20000,
-                    'default',
-                    t.xwhen,
-                    t.xwhat = 1,
-                    t.xwhat = 2,
-                    t.xwhat = 3,
-                    t.xwhat = 4
-                    ) AS level
-            from ${tableName} t;
-        """
-        sql """ DROP TABLE IF EXISTS ${tableName} """
+    String backend_id;
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+    for (String backendId in backendId_to_backendIP.keySet()) {
+        String be_host = backendId_to_backendIP[backendId]
+        String be_http_port = backendId_to_backendHttpPort[backendId]
+        curl("POST", 
"http://${be_host}:${be_http_port}/api/update_config?enable_window_funnel_function_v2=true";)
     }
 
     sql """ DROP TABLE IF EXISTS ${tableName} """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to