github-actions[bot] commented on code in PR #23053:
URL: https://github.com/apache/doris/pull/23053#discussion_r1334703244


##########
be/test/olap/wal_reader_writer_test.cpp:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <gtest/gtest.h>
+
+#include <filesystem>
+
+#include "agent/be_exec_version_manager.h"
+#include "common/object_pool.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "gmock/gmock.h"
+#include "io/fs/local_file_system.h"
+#include "olap/wal_reader.h"
+#include "olap/wal_writer.h"
+#include "runtime/exec_env.h"
+#include "service/brpc.h"
+#include "testutil/test_util.h"
+#include "util/proto_util.h"
+#include "vec/columns/columns_number.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/runtime/vdata_stream_mgr.h"
+#include "vec/runtime/vdata_stream_recvr.h"
+
+using ::testing::_;
+using ::testing::Return;
+using ::testing::SetArgPointee;
+using std::string;
+
+namespace doris {
+
+class WalReaderWriterTest : public testing::Test {
+public:
+    // create a mock cgroup folder
+    virtual void SetUp() { 
io::global_local_filesystem()->create_directory(_s_test_data_path); }
+
+    // delete the mock cgroup folder
+    virtual void TearDown() { 
io::global_local_filesystem()->delete_directory(_s_test_data_path); }
+
+    static std::string _s_test_data_path;
+};
+
+std::string WalReaderWriterTest::_s_test_data_path = 
"./log/wal_reader_writer_test";
+size_t block_rows = 1024;
+
+void covert_block_to_pb(
+        const vectorized::Block& block, PBlock* pblock,
+        segment_v2::CompressionTypePB compression_type = 
segment_v2::CompressionTypePB::SNAPPY) {
+    size_t uncompressed_bytes = 0;
+    size_t compressed_bytes = 0;
+    Status st = block.serialize(BeExecVersionManager::get_newest_version(), 
pblock,
+                                &uncompressed_bytes, &compressed_bytes, 
compression_type);
+    EXPECT_TRUE(st.ok());
+    EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
+    EXPECT_EQ(compressed_bytes, pblock->column_values().size());
+
+    const vectorized::ColumnWithTypeAndName& type_and_name =
+            block.get_columns_with_type_and_name()[0];
+    EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name());
+}
+
+void generate_block(PBlock& pblock, int row_index) {
+    auto vec = vectorized::ColumnVector<int32_t>::create();
+    auto& data = vec->get_data();
+    for (int i = 0; i < block_rows; ++i) {
+        data.push_back(i + row_index);
+    }
+    vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeInt32>());
+    vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, 
"test_int");
+    vectorized::Block block({type_and_name});
+    covert_block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY);
+}
+
+TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
+    std::string file_name = _s_test_data_path + "/abcd123.txt";

Review Comment:
   warning: variable 'file_name' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       std::string file_name = 0 = _s_test_data_path + "/abcd123.txt";
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;

Review Comment:
   warning: variable 'query_options' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       TQueryOptions query_options = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;

Review Comment:
   warning: variable 'fragment_id' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       TUniqueId fragment_id = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;

Review Comment:
   warning: variable 'options' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       brpc::ServerOptions options = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;

Review Comment:
   warning: variable 'tdesc_tbl' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       TDescriptorTable tdesc_tbl = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());

Review Comment:
   warning: variable 'srcRow' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
           std::string srcRow = 0 = org_block.dump_one_line(i, 
org_block.columns());
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;

Review Comment:
   warning: variable 'tdesc_tbl' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       TDescriptorTable tdesc_tbl = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;

Review Comment:
   warning: variable 'query_options' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       TQueryOptions query_options = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;

Review Comment:
   warning: variable 'output_set' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       std::set<std::string> output_set = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("abcde1234567890", 15);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 2, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(1, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +

Review Comment:
   warning: variable 'wal_path' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       std::string wal_path = 0 = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("abcde1234567890", 15);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 2, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(1, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows() - 1, wal_block.rows());
+    for (int i = 0; i < wal_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());

Review Comment:
   warning: variable 'srcRow' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
           std::string srcRow = 0 = org_block.dump_one_line(i, 
org_block.columns());
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;

Review Comment:
   warning: variable 'options' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       brpc::ServerOptions options = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();

Review Comment:
   warning: variable 'column_ptr' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       auto* column_ptr = nullptr = columns[col_idx++].get();
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;

Review Comment:
   warning: variable 'output_set' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       std::set<std::string> output_set = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +

Review Comment:
   warning: variable 'wal_path' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       std::string wal_path = 0 = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;

Review Comment:
   warning: variable 'fragment_id' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       TUniqueId fragment_id = 0;
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());

Review Comment:
   warning: variable 'walRow' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
           std::string walRow = 0 = wal_block.dump_one_line(i, 
org_block.columns());
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("abcde1234567890", 15);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 2, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(1, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows() - 1, wal_block.rows());
+    for (int i = 0; i < wal_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());

Review Comment:
   warning: variable 'walRow' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
           std::string walRow = 0 = wal_block.dump_one_line(i, 
org_block.columns());
   ```
   



##########
be/test/vec/exec/vtablet_sink_test.cpp:
##########
@@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0);
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
+
+TEST_F(VOlapTableSinkTest, group_commit) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();
+    auto column_vector_int = column_ptr;
+    int int_val = 12;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 13;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+    int_val = 14;
+    column_vector_int->insert_data((const char*)&int_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_bigint = column_ptr;
+    int64_t int64_val = 9;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 25;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+    int64_val = 50;
+    column_vector_bigint->insert_data((const char*)&int64_val, 0);
+
+    column_ptr = columns[col_idx++].get();
+    auto column_vector_str = column_ptr;
+    column_vector_str->insert_data("abc", 3);
+    column_vector_str->insert_data("abcd", 4);
+    column_vector_str->insert_data("1234567890", 10);
+
+    vectorized::Block block;
+    col_idx = 0;
+    for (const auto slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    vectorized::Block org_block(block);
+
+    // send
+    st = sink.send(&state, &block);
+    ASSERT_TRUE(st.ok());
+    // close
+    st = sink.close(&state, Status::OK());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
+            << st.to_string();
+
+    // each node has a eof
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
+
+    // 2node * 2
+    ASSERT_EQ(0, state.num_rows_load_filtered());
+
+    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
+                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
+                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
+                           state.import_label();
+    doris::PBlock pblock;
+    auto wal_reader = WalReader(wal_path);
+    st = wal_reader.init();
+    ASSERT_TRUE(st.ok());
+    st = wal_reader.read_block(pblock);
+    ASSERT_TRUE(st.ok());
+    vectorized::Block wal_block;
+    wal_block.deserialize(pblock);
+    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
+    ASSERT_EQ(org_block.rows(), wal_block.rows());
+    for (int i = 0; i < org_block.rows(); i++) {
+        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
+        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
+        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
+    }
+}
+
+TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
+    // start brpc service first
+    _server = new brpc::Server();
+    auto service = new VTestInternalService();
+    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
+    brpc::ServerOptions options;
+    {
+        debug::ScopedLeakCheckDisabler disable_lsan;
+        _server->Start(4356, &options);
+    }
+
+    TUniqueId fragment_id;
+    TQueryOptions query_options;
+    query_options.batch_size = 1;
+    query_options.be_exec_version = 0;
+    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
+    state.init_mem_trackers(TUniqueId());
+
+    ObjectPool obj_pool;
+    TDescriptorTable tdesc_tbl;
+    auto t_data_sink = get_data_sink(&tdesc_tbl);
+
+    // crate desc_tabl
+    DescriptorTbl* desc_tbl = nullptr;
+    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    ASSERT_TRUE(st.ok());
+    state._desc_tbl = desc_tbl;
+    state._wal_id = 789;
+    state._import_label = "test";
+
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
+
+    RowDescriptor row_desc(*desc_tbl, {0}, {false});
+    service->_row_desc = &row_desc;
+    std::set<std::string> output_set;
+    service->_output_set = &output_set;
+
+    std::vector<TExpr> exprs;
+    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
+
+    // init
+    st = sink.init(t_data_sink);
+    ASSERT_TRUE(st.ok());
+    // prepare
+    st = sink.prepare(&state);
+    ASSERT_TRUE(st.ok());
+    // open
+    st = sink.open(&state);
+    ASSERT_TRUE(st.ok());
+
+    int slot_count = tuple_desc->slots().size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
+    for (int i = 0; i < slot_count; i++) {
+        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
+    }
+
+    int col_idx = 0;
+    auto* column_ptr = columns[col_idx++].get();

Review Comment:
   warning: variable 'column_ptr' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       auto* column_ptr = nullptr = columns[col_idx++].get();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to