github-actions[bot] commented on code in PR #61566:
URL: https://github.com/apache/doris/pull/61566#discussion_r3008667269


##########
be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp:
##########
@@ -0,0 +1,1162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <stddef.h>
+
+#include <memory>
+#include <ostream>
+
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/string_buffer.hpp"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "gtest/gtest_pred_impl.h"
+
+namespace doris {
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+void 
register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& 
factory);
+
+class VWindowFunnelV2Test : public testing::Test {
+public:
+    AggregateFunctionPtr agg_function;
+
+    VWindowFunnelV2Test() {}
+
+    void SetUp() {
+        AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+        DataTypes data_types = {
+                std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+                std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+                std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>(),
+                std::make_shared<DataTypeUInt8>()};
+        agg_function = factory.get("window_funnel_v2", data_types, nullptr, 
false,
+                                   BeExecVersionManager::get_newest_version());
+        EXPECT_NE(agg_function, nullptr);
+    }
+
+    void TearDown() {}
+
+    Arena arena;
+};
+
+TEST_F(VWindowFunnelV2Test, testEmpty) {
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+
+    ColumnString buf;
+    VectorBufferWriter buf_writer(buf);
+    agg_function->serialize(place, buf_writer);
+    buf_writer.commit();
+    LOG(INFO) << "buf size : " << buf.size();
+    VectorBufferReader buf_reader(buf.get_data_at(0));
+    agg_function->deserialize(place, buf_reader, arena);
+
+    std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place2 = memory2.get();
+    agg_function->create(place2);
+
+    agg_function->merge(place, place2, arena);
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    EXPECT_EQ(column_result.get_data()[0], 0);
+
+    ColumnInt32 column_result2;
+    agg_function->insert_result_into(place2, column_result2);
+    EXPECT_EQ(column_result2.get_data()[0], 0);
+
+    agg_function->destroy(place);
+    agg_function->destroy(place2);
+}
+
+TEST_F(VWindowFunnelV2Test, testSerialize) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(2));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_CONDS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    EXPECT_EQ(column_result.get_data()[0], 3);
+
+    ColumnString buf;
+    VectorBufferWriter buf_writer(buf);
+    agg_function->serialize(place, buf_writer);
+    buf_writer.commit();
+    agg_function->destroy(place);
+
+    std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place2 = memory2.get();
+    agg_function->create(place2);
+
+    VectorBufferReader buf_reader(buf.get_data_at(0));
+    agg_function->deserialize(place2, buf_reader, arena);
+
+    ColumnInt32 column_result2;
+    agg_function->insert_result_into(place2, column_result2);
+    EXPECT_EQ(column_result2.get_data()[0], 3);
+    agg_function->destroy(place2);
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultSortedNoMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+        agg_function->destroy(place);
+    }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultSortedMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        std::unique_ptr<char[]> memory2(new 
char[agg_function->size_of_data()]);
+        AggregateDataPtr place2 = memory2.get();
+        agg_function->create(place2);
+
+        agg_function->merge(place2, place, arena);
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place2, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+        agg_function->destroy(place);
+        agg_function->destroy(place2);
+    }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedNoMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        LOG(INFO) << "win " << win;
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+        agg_function->destroy(place);
+    }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        std::unique_ptr<char[]> memory2(new 
char[agg_function->size_of_data()]);
+        AggregateDataPtr place2 = memory2.get();
+        agg_function->create(place2);
+
+        agg_function->merge(place2, place, arena);
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place2, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+        agg_function->destroy(place);
+        agg_function->destroy(place2);
+    }
+}
+
+// Test that V2 only stores matched events (unmatched rows are not stored)
+// This verifies the core memory optimization.
+TEST_F(VWindowFunnelV2Test, testOnlyMatchedEventsStored) {
+    const int NUM_ROWS = 6;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    // 4 events, but rows 2 and 4 (0-indexed) match nothing
+    // Row 0: event1=true
+    // Row 1: event2=true
+    // Row 2: all false (no match)
+    // Row 3: event3=true
+    // Row 4: all false (no match)
+    // Row 5: event4=true
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // All 4 events matched in order within the window
+    EXPECT_EQ(column_result.get_data()[0], 4);
+    agg_function->destroy(place);
+}
+
+// Test INCREASE mode: timestamps must be strictly increasing
+TEST_F(VWindowFunnelV2Test, testIncreaseMode) {
+    const int NUM_ROWS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    // Events 2 and 3 have the same timestamp
+    VecDateTimeValue tv0, tv1, tv2, tv3;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 1); // same as tv1
+    tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // Event 2 and 3 have same timestamp, so increase mode breaks at event 3
+    // Chain: event1(t=0) -> event2(t=1), event3 has same ts as event2, so 
fails
+    // Result: 2
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_function->destroy(place);
+}
+
+// Test DEDUPLICATION mode: duplicate events break the chain
+TEST_F(VWindowFunnelV2Test, testDeduplicationMode) {
+    const int NUM_ROWS = 5;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2);
+    tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+    tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    auto dtv2_4 = tv4.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+    column_timestamp->insert_data((char*)&dtv2_4, 0);
+
+    // Events: event1, event2, event1(dup!), event3, event4
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // duplicate
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // Chain: event1(t=0) -> event2(t=1), then event1 dup at t=2 breaks chain 
(max_level=2)
+    // New chain: event1(t=2) -> event3(t=3) -> event4(t=4): level=3 but 
event2 not matched
+    // Actually event1(t=2) starts new chain, event3(t=3) needs event2 first, 
so new chain = 1
+    // max(2, 1) = 2
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_function->destroy(place);
+}
+
+// Test FIXED mode (StarRocks-style): event level must not jump
+TEST_F(VWindowFunnelV2Test, testFixedMode) {
+    const int NUM_ROWS = 5;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("fixed"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2);
+    tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+    tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    auto dtv2_4 = tv4.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+    column_timestamp->insert_data((char*)&dtv2_4, 0);
+
+    // Events: event1, event2, event4(jump! skips event3), event3, event4
+    // In V2 fixed mode (StarRocks-style), event4 at t=2 has no predecessor 
(event3 not matched),
+    // so the chain breaks.
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1)); // jump
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // Chain: event1(t=0) -> event2(t=1), then event4(t=2) jumps (no event3 
predecessor),
+    // chain breaks, max_level=2.
+    // No further complete chain starts since event3 and event4 happen after.
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_function->destroy(place);
+}
+
+// Test that same-row multi-condition matching does NOT advance the chain
+// through multiple levels. This tests the continuation flag logic.
+// Scenario: window_funnel(86400, 'default', ts, xwhat=1, xwhat!=2, xwhat=3)
+// Row 0 (xwhat=1): matches cond0=T (xwhat=1), cond1=T (1!=2), cond2=F → 2 
conditions on same row
+// Row 1 (xwhat=2): matches nothing (cond1: 2!=2=F)
+// Row 2 (xwhat=3): matches cond1=T (3!=2), cond2=T (xwhat=3) → 2 conditions 
on same row
+// Correct result: 2 (cond0 from row0, cond1 from row2). NOT 3.
+// Without the continuation flag, row0 would advance through both cond0 and 
cond1,
+// then row2 would match cond2 → result 3 (wrong).
+TEST_F(VWindowFunnelV2Test, testSameRowMultiConditionDefault) {
+    // 3 conditions (instead of 4), so we need a different setup
+    AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+    DataTypes data_types_3 = {
+            std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+            std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+            std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>()};
+    auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, 
false,
+                                  BeExecVersionManager::get_newest_version());
+    ASSERT_NE(agg_func_3, nullptr);
+
+    const int NUM_ROWS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    // Row 0: ts=10:41:00  (xwhat=1)
+    // Row 1: ts=13:28:02  (xwhat=2)
+    // Row 2: ts=16:15:01  (xwhat=3)
+    // Row 3: ts=19:05:04  (xwhat=4)
+    VecDateTimeValue tv0, tv1, tv2, tv3;
+    tv0.unchecked_set_time(2022, 3, 12, 10, 41, 0);
+    tv1.unchecked_set_time(2022, 3, 12, 13, 28, 2);
+    tv2.unchecked_set_time(2022, 3, 12, 16, 15, 1);
+    tv3.unchecked_set_time(2022, 3, 12, 19, 5, 4);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+    // cond0: xwhat=1 (only row0 matches)
+    auto column_cond0 = ColumnUInt8::create();
+    column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row0: 
xwhat=1 → T
+    column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: 
xwhat=2 → F
+    column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row2: 
xwhat=3 → F
+    column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row3: 
xwhat=4 → F
+
+    // cond1: xwhat!=2 (rows 0,2,3 match)
+    auto column_cond1 = ColumnUInt8::create();
+    column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row0: 1!=2 
→ T
+    column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: 2!=2 
→ F
+    column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row2: 3!=2 
→ T
+    column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row3: 4!=2 
→ T
+
+    // cond2: xwhat=3 (only row2 matches)
+    auto column_cond2 = ColumnUInt8::create();
+    column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row0: F
+    column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: F
+    column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row2: T
+    column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row3: F
+
+    // window = 86400 seconds (24 hours)
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_func_3->create(place);
+    const IColumn* column[6] = {column_window.get(), column_mode.get(),  
column_timestamp.get(),
+                                column_cond0.get(),  column_cond1.get(), 
column_cond2.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_func_3->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_func_3->insert_result_into(place, column_result);
+    // Without continuation flag: row0 matches cond0+cond1 (same row advances 
both),
+    // row2 matches cond2 → result=3 (WRONG)
+    // With continuation flag: row0 sets cond0 only (cond1 is same-row 
continuation),
+    // row2's cond1 extends chain (different row), but cond2 from row2 is 
same-row → stops at 2
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_func_3->destroy(place);
+}
+
+// Test same-row multi-condition with ALL conditions matching the same event 
name
+// window_funnel(big_window, 'default', ts, event='登录', event='登录', 
event='登录', ...)
+// A single row matching all 4 conditions should only count as level 1 (not 4).
+TEST_F(VWindowFunnelV2Test, testSameRowAllConditionsMatch) {
+    auto column_mode = ColumnString::create();
+    column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+
+    // All 4 conditions match the single row
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    agg_function->add(place, column, 0, arena);
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // Only 1 row matching all conditions → can only reach level 1
+    // because each funnel step must come from a different row
+    EXPECT_EQ(column_result.get_data()[0], 1);
+    agg_function->destroy(place);
+}
+
+// Test INCREASE mode: event-0 re-occurrence should not break an already-valid 
chain.
+// Counterexample from code review:
+//   3 conditions, window=100s, INCREASE mode
+//   Row A (t=0s): event1 only
+//   Row B (t=50s): event1 only  (new event-0 overwrites old)
+//   Row C (t=50s): event2 only
+//   Row D (t=60s): event3 only
+// Correct result: 3 (chain starting from t=0: e1@0 -> e2@50 -> e3@60)
+// Bug (before fix): returned 1 because overwriting events_timestamp[0] with 
t=50
+// caused the INCREASE check (50 < 50) to fail for e2.
+TEST_F(VWindowFunnelV2Test, testIncreaseModeEvent0Overwrite) {
+    AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+    DataTypes data_types_3 = {
+            std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+            std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+            std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>()};
+    auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, 
false,
+                                  BeExecVersionManager::get_newest_version());
+    ASSERT_NE(agg_func_3, nullptr);
+
+    const int NUM_ROWS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    // Row 0: t=0s, Row 1: t=50s, Row 2: t=50s, Row 3: t=60s
+    VecDateTimeValue tv0, tv1, tv2, tv3;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+    tv3.unchecked_set_time(2022, 2, 28, 0, 1, 0);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+    // Row 0: event1=T, event2=F, event3=F
+    // Row 1: event1=T, event2=F, event3=F  (duplicate event-0)
+    // Row 2: event1=F, event2=T, event3=F
+    // Row 3: event1=F, event2=F, event3=T
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(100));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_func_3->create(place);
+    const IColumn* column[6] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_func_3->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_func_3->insert_result_into(place, column_result);
+    // Chain from t=0: e1@0 -> e2@50 (50>0 ✓) -> e3@60 (60>50 ✓) = 3
+    EXPECT_EQ(column_result.get_data()[0], 3);
+    agg_func_3->destroy(place);
+}
+
+// Test INCREASE mode: later event-0 starts a better chain when early chain 
cannot complete.
+// 3 conditions, window=5s, INCREASE mode
+//   Row 0 (t=0s): event1
+//   Row 1 (t=50s): event1  (new start — old chain can't reach e2 within 5s)
+//   Row 2 (t=51s): event2
+//   Row 3 (t=52s): event3
+// Correct result: 3 (chain starting from t=50: e1@50 -> e2@51 -> e3@52)
+TEST_F(VWindowFunnelV2Test, testIncreaseModeNewChainBetter) {
+    AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+    DataTypes data_types_3 = {
+            std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+            std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+            std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>()};
+    auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, 
false,
+                                  BeExecVersionManager::get_newest_version());
+    ASSERT_NE(agg_func_3, nullptr);
+
+    const int NUM_ROWS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0, tv1, tv2, tv3;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 51);
+    tv3.unchecked_set_time(2022, 2, 28, 0, 0, 52);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(5));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_func_3->create(place);
+    const IColumn* column[6] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_func_3->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_func_3->insert_result_into(place, column_result);
+    // Old chain from t=0 reaches only level 1 (e2@51 is 51s away, outside 5s 
window)
+    // New chain from t=50: e1@50 -> e2@51 (51>50 ✓, within 5s) -> e3@52 
(52>51 ✓) = 3
+    EXPECT_EQ(column_result.get_data()[0], 3);
+    agg_func_3->destroy(place);
+}
+
+// Test INCREASE mode: old chain is better than new chain (max_level 
preserved).
+// 3 conditions, window=100s, INCREASE mode
+//   Row 0 (t=0s): event1
+//   Row 1 (t=10s): event2
+//   Row 2 (t=50s): event1  (restarts chain, old chain had level=2)
+//   No more events after the restart — new chain can only reach level 1.
+// Correct result: 2 (old chain: e1@0 -> e2@10)
+TEST_F(VWindowFunnelV2Test, testIncreaseModeOldChainBetter) {
+    AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+    DataTypes data_types_3 = {
+            std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+            std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+            std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>()};
+    auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, 
false,
+                                  BeExecVersionManager::get_newest_version());
+    ASSERT_NE(agg_func_3, nullptr);
+
+    const int NUM_ROWS = 3;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0, tv1, tv2;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 10);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+
+    // Row 0: e1=T, Row 1: e2=T, Row 2: e1=T (restart)
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(100));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_func_3->create(place);
+    const IColumn* column[6] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_func_3->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_func_3->insert_result_into(place, column_result);
+    // Old chain: e1@0 -> e2@10 = level 2
+    // New chain from t=50: only e1@50, no further events = level 1
+    // max(2, 1) = 2
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_func_3->destroy(place);
+}
+auto column_timestamp = ColumnDateTimeV2::create();

Review Comment:
   **[Critical: Compilation Error]** Lines 1106-1160 contain orphan code at 
namespace scope — outside any `TEST_F` block. This code references variables 
(`NUM_ROWS`, `agg_func_3`, `arena`, `column_mode`) that only exist inside the 
previous `testIncreaseModeOldChainBetter` test function. This will **fail to 
compile**.
   
   It appears to be a partially pasted copy of the test above with modified 
event data (event3 matches at row 3 instead of being all zeros). Either:
   1. Remove this orphan code entirely, or
   2. Wrap it in a proper `TEST_F(VWindowFunnelV2Test, testIncreaseModeXxx)` 
block with its own setup (including `agg_func_3`, `column_mode`, `NUM_ROWS`, 
etc.)



##########
regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy:
##########
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("window_funnel_v2") {
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+
+    // ==================== Basic DEFAULT mode tests ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+            xwho varchar(50) NULL COMMENT 'xwho',
+            xwhen datetime COMMENT 'xwhen',
+            xwhat int NULL COMMENT 'xwhat'
+        )
+        DUPLICATE KEY(xwho)
+        DISTRIBUTED BY HASH(xwho) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 16:15:01', 3)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 19:05:04', 4)"
+
+    // window=1 second, only event1 matches (events too far apart)
+    order_qt_v2_default_small_window """
+        select
+            window_funnel(
+                1,
+                'default',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+    // window=20000 seconds, both events match
+    order_qt_v2_default_large_window """
+        select
+            window_funnel(
+                20000,
+                'default',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // ==================== DateTimeV2 precision test ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+            xwho varchar(50) NULL COMMENT 'xwho',
+            xwhen datetimev2(3) COMMENT 'xwhen',
+            xwhat int NULL COMMENT 'xwhat'
+        )
+        DUPLICATE KEY(xwho)
+        DISTRIBUTED BY HASH(xwho) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00.111111', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 16:15:01.111111', 3)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 19:05:04.111111', 4)"
+
+    order_qt_v2_datetimev2_small """
+        select
+            window_funnel(
+                1,
+                'default',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+    order_qt_v2_datetimev2_large """
+        select
+            window_funnel(
+                20000,
+                'default',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // ==================== Multi-user default mode tests ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 3),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    // 3 hour window
+    order_qt_v2_multi_user_default0 """
+        SELECT
+            user_id,
+            window_funnel(3600 * 3, "default", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+    // 5 minute window
+    order_qt_v2_multi_user_default1 """
+        SELECT
+            user_id,
+            window_funnel(300, "default", event_timestamp, event_name = '登录', 
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+    // 30 second window
+    order_qt_v2_multi_user_default2 """
+        SELECT
+            user_id,
+            window_funnel(30, "default", event_timestamp, event_name = '登录', 
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+    // complicate expressions with != condition
+    order_qt_v2_default_neq """
+        SELECT
+            user_id,
+            window_funnel(3600000000, "default", event_timestamp, event_name = 
'登录', event_name != '登陆', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id;
+    """
+    // Complex filter conditions
+    order_qt_v2_default_complex """
+        SELECT
+            user_id,
+            window_funnel(3600000000, "default", event_timestamp,
+                          event_name = '登录' AND phone_brand in ('HONOR', 
'XIAOMI', 'VIVO') AND tab_num not in (4, 5),
+                          event_name = '访问' AND tab_num not in (4, 5),
+                          event_name = '下单' AND tab_num not in (6, 7),
+                          event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id;
+    """
+
+    // ==================== DEDUPLICATION mode tests ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '登录', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录1', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录2', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录3', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录4', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录5', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    order_qt_v2_deduplication0 """
+        SELECT
+            user_id,
+            window_funnel(3600, "deduplication", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // Test dedup with duplicate event2 (访问)
+    sql """ truncate table windowfunnel_v2_test; """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '登录1', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '访问', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    order_qt_v2_deduplication1 """
+        SELECT
+            user_id,
+            window_funnel(3600, "deduplication", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // ==================== FIXED mode tests (StarRocks-style semantics) 
====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    // Note: In V2 fixed mode (StarRocks-style), unmatched rows don't break 
the chain.
+    // The chain only breaks when a matched event's predecessor level hasn't 
been matched.
+    order_qt_v2_fixed0 """
+        SELECT
+            user_id,
+            window_funnel(3600, "fixed", event_timestamp, event_name = '登录', 
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // Test fixed mode where event order in conditions doesn't match data order
+    sql """ truncate table windowfunnel_v2_test; """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4);
+    """
+    order_qt_v2_fixed_reorder """
+        select
+            window_funnel(
+                20000,
+                'fixed',
+                t.event_timestamp,
+                t.event_name = '登录',
+                t.event_name = '访问',
+                t.event_name = '付款',
+                t.event_name = '下单'
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // ==================== INCREASE mode tests ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '付款', '2022-05-14 10:04:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    order_qt_v2_increase0 """
+        SELECT
+            user_id,
+            window_funnel(3600, "increase", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // Test increase mode with same-timestamp events
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+            xwho varchar(50) NULL COMMENT 'xwho',
+            xwhen datetimev2(3) COMMENT 'xwhen',
+            xwhat int NULL COMMENT 'xwhat'
+        )
+        DUPLICATE KEY(xwho)
+        DISTRIBUTED BY HASH(xwho) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00.111111', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 3)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 15:05:04.111111', 4)"
+    order_qt_v2_increase_same_ts """
+        select
+            window_funnel(
+                20000,
+                'increase',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2,
+                t.xwhat = 3,
+                t.xwhat = 4
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // ==================== V2 FIXED mode key difference from V1 
====================
+    // In V1, unmatched rows (rows that match no event condition) break the 
chain in FIXED mode.
+    // In V2, unmatched rows are not stored, so only matched events with level 
jumps break the chain.
+    // This test shows the behavioral difference.
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '登录2', '2022-05-14 10:03:00', 'HONOR', 3),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4);
+    """
+    // V2 fixed mode: 登录2 doesn't match any condition, so it's not stored.
+    // The chain 登录->访问->下单->付款 is unbroken because there are no level jumps.
+    // V1 would return 2 here (登录2 physically breaks adjacency), V2 returns 4.
+    order_qt_v2_fixed_vs_v1 """
+        SELECT
+            user_id,
+            window_funnel(3600, "fixed", event_timestamp, event_name = '登录', 
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // ==================== Test using window_funnel_v2 explicit name 
====================
+    order_qt_v2_explicit_name """
+        SELECT
+            user_id,
+            window_funnel_v2(3600, "fixed", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // ==================== INCREASE mode: event-0 re-occurrence bug fix 
====================
+    // Regression test for the bug where a later event-0 overwrites 
events_timestamp[0]
+    // and breaks the INCREASE mode strict-increase check for an already-valid 
chain.
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+            xwho varchar(50) NULL COMMENT 'xwho',
+            xwhen datetimev2(3) COMMENT 'xwhen',
+            xwhat int NULL COMMENT 'xwhat'
+        )
+        DUPLICATE KEY(xwho)
+        DISTRIBUTED BY HASH(xwho) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    // Case 1: Old chain (from t=0) is valid and completes all 3 levels.
+    // The duplicate event-0 at t=50 should not destroy it.
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:00.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:50.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:50.000', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:01:00.000', 3)"
+    order_qt_v2_increase_event0_overwrite """
+        select
+            window_funnel(
+                100,
+                'increase',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2,
+                t.xwhat = 3
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // Case 2: Old chain can't complete (window too small), new chain is 
better.
+    sql """ truncate table windowfunnel_v2_test; """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:00.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:50.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:51.000', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:52.000', 3)"
+    order_qt_v2_increase_new_chain_better """
+        select
+            window_funnel(
+                5,
+                'increase',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2,
+                t.xwhat = 3
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // Case 3: Old chain is better (reached level 2), new chain only reaches 
level 1.
+    sql """ truncate table windowfunnel_v2_test; """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:00.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:10.000', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:50.000', 1)"
+    order_qt_v2_increase_old_chain_better """
+        select
+            window_funnel(
+                100,
+                'increase',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2,
+                t.xwhat = 3
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """

Review Comment:
   **[Medium]** Per Doris testing standards, tables should NOT be dropped after 
tests — they should be preserved for debugging. The `DROP TABLE IF EXISTS` 
before each test section is correct, but this final drop at the end of the 
suite should be removed.



##########
be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h:
##########
@@ -0,0 +1,599 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <iterator>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/column/column_string.h"
+#include "core/data_type/data_type_number.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_window_funnel.h" // for 
WindowFunnelMode, string_to_window_funnel_mode
+#include "util/var_int.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class Arena;
+class BufferReadable;
+class BufferWritable;
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+/// Merge two event lists, utilizing sorted flags to optimize.
+/// After merge, all events are in `events_list` and it is sorted.
+template <typename T>
+void merge_events_list(T& events_list, size_t prefix_size, bool prefix_sorted, 
bool suffix_sorted) {
+    if (!prefix_sorted && !suffix_sorted) {
+        std::stable_sort(std::begin(events_list), std::end(events_list));
+    } else {
+        const auto begin = std::begin(events_list);
+        const auto middle = std::next(begin, prefix_size);
+        const auto end = std::end(events_list);
+
+        if (!prefix_sorted) {
+            std::stable_sort(begin, middle);
+        }
+        if (!suffix_sorted) {
+            std::stable_sort(middle, end);
+        }
+        std::inplace_merge(begin, middle, end);

Review Comment:
   **[Low: Documentation/Robustness]** The correctness of continuation flags 
after `merge()` + `inplace_merge()` relies critically on the stability 
guarantee of `std::inplace_merge` — that equal-timestamp elements from the 
prefix range always come before those from the suffix range, preserving 
same-row event groups intact. Consider adding a more prominent comment 
documenting this invariant near the `inplace_merge` call in the `merge()` 
method (~line 182), e.g.:
   
   ```cpp
   // CRITICAL: std::inplace_merge is stable, ensuring same-row events
   // (which share timestamps) remain consecutive. Do NOT replace with a 
non-stable merge.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to