This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ffd9da44a2a1803bcab1d2c4895eea9eed9f6e44
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Fri Apr 19 10:13:18 2024 +0800

    [fix](move-memtable) fix commit may fail due to duplicated reports (#32403)
---
 be/src/vec/sink/load_stream_stub.h                 |  12 ++
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  86 +++++---
 be/src/vec/sink/writer/vtablet_writer_v2.h         |   7 +
 be/test/vec/sink/vtablet_writer_v2_test.cpp        | 239 +++++++++++++++++++++
 .../test_commit_info_fault_injection.groovy        |  99 +++++++++
 5 files changed, 414 insertions(+), 29 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 5ebec9f9d78..8ef40b84145 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -200,6 +200,18 @@ public:
 
     std::string to_string();
 
+    // for tests only
+    void add_success_tablet(int64_t tablet_id) {
+        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+        _success_tablets.push_back(tablet_id);
+    }
+
+    // for tests only
+    void add_failed_tablet(int64_t tablet_id, Status reason) {
+        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+        _failed_tablets[tablet_id] = reason;
+    }
+
 private:
     Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data 
= {});
     Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 1f1756b5a16..21a87c150b8 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -574,39 +574,28 @@ Status VTabletWriterV2::close(Status exec_status) {
 
         // calculate and submit commit info
         if (is_last_sink) {
-            std::unordered_map<int64_t, int> failed_tablets;
-            std::unordered_map<int64_t, Status> failed_reason;
-            std::vector<TTabletCommitInfo> tablet_commit_infos;
-
-            _load_stream_map->for_each([&](int64_t dst_id, const Streams& 
streams) {
-                std::unordered_set<int64_t> known_tablets;
-                for (const auto& stream : streams) {
-                    for (auto [tablet_id, reason] : stream->failed_tablets()) {
-                        if (known_tablets.contains(tablet_id)) {
-                            continue;
-                        }
-                        known_tablets.insert(tablet_id);
-                        failed_tablets[tablet_id]++;
-                        failed_reason[tablet_id] = reason;
-                    }
-                    for (auto tablet_id : stream->success_tablets()) {
-                        if (known_tablets.contains(tablet_id)) {
-                            continue;
-                        }
-                        known_tablets.insert(tablet_id);
-                        TTabletCommitInfo commit_info;
-                        commit_info.tabletId = tablet_id;
-                        commit_info.backendId = dst_id;
-                        
tablet_commit_infos.emplace_back(std::move(commit_info));
+            DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
+                auto streams = 
_load_stream_map->at(_tablets_for_node.begin()->first);
+                int64_t tablet_id = -1;
+                for (auto& stream : *streams) {
+                    const auto& tablets = stream->success_tablets();
+                    if (tablets.size() > 0) {
+                        tablet_id = tablets[0];
+                        break;
                     }
                 }
+                if (tablet_id != -1) {
+                    LOG(INFO) << "fault injection: adding failed tablet_id: " 
<< tablet_id;
+                    streams->front()->add_failed_tablet(tablet_id,
+                                                        
Status::InternalError("fault injection"));
+                } else {
+                    LOG(INFO) << "fault injection: failed to inject failed 
tablet_id";
+                }
             });
 
-            for (auto [tablet_id, replicas] : failed_tablets) {
-                if (replicas > (_num_replicas - 1) / 2) {
-                    return failed_reason.at(tablet_id);
-                }
-            }
+            std::vector<TTabletCommitInfo> tablet_commit_infos;
+            RETURN_IF_ERROR(
+                    _create_commit_info(tablet_commit_infos, _load_stream_map, 
_num_replicas));
             _state->tablet_commit_infos().insert(
                     _state->tablet_commit_infos().end(),
                     std::make_move_iterator(tablet_commit_infos.begin()),
@@ -659,4 +648,43 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
     }
 }
 
+Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& 
tablet_commit_infos,
+                                            std::shared_ptr<LoadStreamMap> 
load_stream_map,
+                                            int num_replicas) {
+    std::unordered_map<int64_t, int> failed_tablets;
+    std::unordered_map<int64_t, Status> failed_reason;
+    load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
+        std::unordered_set<int64_t> known_tablets;
+        for (const auto& stream : streams) {
+            for (auto [tablet_id, reason] : stream->failed_tablets()) {
+                if (known_tablets.contains(tablet_id)) {
+                    continue;
+                }
+                known_tablets.insert(tablet_id);
+                failed_tablets[tablet_id]++;
+                failed_reason[tablet_id] = reason;
+            }
+            for (auto tablet_id : stream->success_tablets()) {
+                if (known_tablets.contains(tablet_id)) {
+                    continue;
+                }
+                known_tablets.insert(tablet_id);
+                TTabletCommitInfo commit_info;
+                commit_info.tabletId = tablet_id;
+                commit_info.backendId = dst_id;
+                tablet_commit_infos.emplace_back(std::move(commit_info));
+            }
+        }
+    });
+
+    for (auto [tablet_id, replicas] : failed_tablets) {
+        if (replicas > (num_replicas - 1) / 2) {
+            LOG(INFO) << "tablet " << tablet_id
+                      << " failed on majority backends: " << 
failed_reason[tablet_id];
+            return failed_reason.at(tablet_id);
+        }
+    }
+    return Status::OK();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 20952229bbb..e3d31fb32b9 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -114,6 +114,13 @@ public:
 
     Status on_partitions_created(TCreatePartitionResult* result);
 
+#ifndef BE_TEST
+private:
+#endif
+    static Status _create_commit_info(std::vector<TTabletCommitInfo>& 
tablet_commit_infos,
+                                      std::shared_ptr<LoadStreamMap> 
load_stream_map,
+                                      int num_replicas);
+
 private:
     Status _init_row_distribution();
 
diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp 
b/be/test/vec/sink/vtablet_writer_v2_test.cpp
new file mode 100644
index 00000000000..6289896c75f
--- /dev/null
+++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/vtablet_writer_v2.h"
+
+#include <gtest/gtest.h>
+
+#include "vec/sink/load_stream_map_pool.h"
+#include "vec/sink/load_stream_stub.h"
+
+namespace doris {
+
+class TestVTabletWriterV2 : public ::testing::Test {
+public:
+    TestVTabletWriterV2() = default;
+    ~TestVTabletWriterV2() = default;
+    static void SetUpTestSuite() {}
+    static void TearDownTestSuite() {}
+};
+
+const int64_t src_id = 1000;
+
+static void add_stream(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t 
node_id,
+                       std::vector<int64_t> success_tablets,
+                       std::unordered_map<int64_t, Status> failed_tablets) {
+    auto stub = load_stream_map->get_or_create(node_id);
+    for (const auto& tablet_id : success_tablets) {
+        stub->at(0)->add_success_tablet(tablet_id);
+    }
+    for (const auto& [tablet_id, reason] : failed_tablets) {
+        stub->at(0)->add_failed_tablet(tablet_id, reason);
+    }
+}
+
+TEST_F(TestVTabletWriterV2, one_replica) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 1;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 2);
+}
+
+TEST_F(TestVTabletWriterV2, one_replica_fail) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 1;
+    add_stream(load_stream_map, 1001, {1}, {{2, 
Status::InternalError("test")}});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_EQ(st, Status::InternalError("test"));
+}
+
+TEST_F(TestVTabletWriterV2, two_replica) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 2;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {1, 2}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 4);
+}
+
+TEST_F(TestVTabletWriterV2, two_replica_fail) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 2;
+    add_stream(load_stream_map, 1001, {1}, {{2, 
Status::InternalError("test")}});
+    add_stream(load_stream_map, 1002, {1, 2}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_EQ(st, Status::InternalError("test"));
+}
+
+TEST_F(TestVTabletWriterV2, normal) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {1, 2}, {});
+    add_stream(load_stream_map, 1003, {1, 2}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 6);
+}
+
+TEST_F(TestVTabletWriterV2, miss_one) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {1}, {});
+    add_stream(load_stream_map, 1003, {1, 2}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 5);
+}
+
+TEST_F(TestVTabletWriterV2, miss_two) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {1}, {});
+    add_stream(load_stream_map, 1003, {1}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 4);
+}
+
+TEST_F(TestVTabletWriterV2, fail_one) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
+    add_stream(load_stream_map, 1003, {1, 2}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 5);
+}
+
+TEST_F(TestVTabletWriterV2, fail_one_duplicate) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
+    add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
+    add_stream(load_stream_map, 1003, {1, 2}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    // Duplicate tablets from same node should be ignored
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 5);
+}
+
+TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_same_node) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {},
+               {{1, Status::InternalError("test")}, {2, 
Status::InternalError("test")}});
+    add_stream(load_stream_map, 1003, {1, 2}, {});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 4);
+}
+
+TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_diff_node) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
+    add_stream(load_stream_map, 1003, {2}, {{1, 
Status::InternalError("test")}});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(tablet_commit_infos.size(), 4);
+}
+
+TEST_F(TestVTabletWriterV2, fail_two_same_tablet) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1, 2}, {});
+    add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
+    add_stream(load_stream_map, 1003, {1}, {{2, 
Status::InternalError("test")}});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    // BE should detect and abort commit if majority of replicas failed
+    ASSERT_EQ(st, Status::InternalError("test"));
+}
+
+TEST_F(TestVTabletWriterV2, fail_two_miss_one_same_tablet) {
+    UniqueId load_id;
+    std::vector<TTabletCommitInfo> tablet_commit_infos;
+    std::shared_ptr<LoadStreamMap> load_stream_map =
+            std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
+    const int num_replicas = 3;
+    add_stream(load_stream_map, 1001, {1}, {});
+    add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
+    add_stream(load_stream_map, 1003, {1}, {{2, 
Status::InternalError("test")}});
+    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
+                                                               num_replicas);
+    // BE should detect and abort commit if majority of replicas failed
+    ASSERT_EQ(st, Status::InternalError("test"));
+}
+
+} // namespace doris
diff --git 
a/regression-test/suites/fault_injection_p0/test_commit_info_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_commit_info_fault_injection.groovy
new file mode 100644
index 00000000000..be769149df4
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_commit_info_fault_injection.groovy
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_commit_info_fault_injection", "nonConcurrent") {
+    def res = sql"show backends;"
+    logger.info(res.toString())
+    def beNums = 0;
+    res.each { item ->
+        beNums++;
+        logger.info(item.toString())
+    }
+    if (beNums == 3){
+        result = sql "show VARIABLES like \'enable_memtable_on_sink_node\'"
+        log.info(result.toString())
+        original_status = result[0][1]       
+
+        sql """ set enable_memtable_on_sink_node=true """
+        sql """
+            CREATE TABLE IF NOT EXISTS `baseall` (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"3")
+            """
+        sql """
+            CREATE TABLE IF NOT EXISTS `test` (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace_if_not_null null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"3")
+            """
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        streamLoad {
+            table "baseall"
+            db "regression_test_fault_injection_p0"
+            set 'column_separator', ','
+            file "baseall.txt"
+        }
+
+        def load_with_injection = { injection, error_msg->
+            try {
+                GetDebugPoint().enableDebugPointForAllBEs(injection)
+                sql "insert into test select * from baseall where k1 <= 3"
+            } catch(Exception e) {
+                logger.info(e.getMessage())
+                assertTrue(e.getMessage().contains(error_msg))
+            } finally {
+                GetDebugPoint().disableDebugPointForAllBEs(injection)
+            }
+        }
+
+        // One replica commit failed, load should success
+        load_with_injection("VTabletWriterV2.close.add_failed_tablet", 
"sucess")
+
+        sql """set enable_memtable_on_sink_node = ${original_status}"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to