This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e260a1b328 [fix](group commit) fix lost row when prepared stmt reused 
plan shares one load_id (#64362)
0e260a1b328 is described below

commit 0e260a1b328ae67a4be3ab85d1063783d414d70a
Author: meiyi <[email protected]>
AuthorDate: Tue Jun 30 10:30:42 2026 +0800

    [fix](group commit) fix lost row when prepared stmt reused plan shares one 
load_id (#64362)
    
    1. When several inserts reuse one server-side prepared group commit
    plan, they share a single load_id X
    (query_id/load_id/fragment_instance_id are baked into the reused
    serialized plan).
    2. The BE uses X as the per-queue registration key in
    LoadBlockQueue::_load_ids_to_write_dep, and remove_load_id(X) is issued
    from two sites: wind_up() (synchronous, before the RPC returns and
    correctly ordered by FE serialization) and
    ~GroupCommitBlockSinkLocalState (async fragment teardown, after the RPC
    has returned).
    3. FE serialization does not order one load's async teardown remove(X)
    against the next load's add_load_id(X), so the teardown can erase the
    next load's in-flight registration; the consumer then sees no writer in
    flight, commits the queue early, and the next load's add_block lands on
    the already-committed queue and is silently lost while FE reports
    success.
    
    Fix: track whether wind_up() already removed the load_id
    (_load_id_removed) and suppress the redundant remove in the destructor
    on the normal path, keeping it only as a fallback for abnormal
    (cancel/error) paths where wind_up did not run.
---
 .../operator/group_commit_block_sink_operator.cpp  |  24 +++-
 .../operator/group_commit_block_sink_operator.h    |   4 +
 be/src/load/group_commit/group_commit_mgr.cpp      |  15 ++
 be/src/load/group_commit/group_commit_mgr.h        |   4 +
 .../test_group_commit_prepare_lost_row.groovy      | 157 +++++++++++++++++++++
 5 files changed, 202 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/operator/group_commit_block_sink_operator.cpp 
b/be/src/exec/operator/group_commit_block_sink_operator.cpp
index 38ea6d3c18a..e441882f73b 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.cpp
+++ b/be/src/exec/operator/group_commit_block_sink_operator.cpp
@@ -19,15 +19,33 @@
 
 #include <gen_cpp/DataSinks_types.h>
 
+#include <chrono>
+#include <thread>
+
 #include "exec/sink/vtablet_block_convertor.h"
 #include "load/group_commit/group_commit_mgr.h"
+#include "util/debug_points.h"
 
 namespace doris {
 GroupCommitBlockSinkLocalState::~GroupCommitBlockSinkLocalState() {
     if (_load_block_queue) {
+        DBUG_EXECUTE_IF("GroupCommitBlockSink.delay_teardown_remove_load_id", {
+            int64_t waited_ms = 0;
+            while (_load_block_queue->group_commit_load_count.load() < 2 && 
waited_ms < 10000 &&
+                   DebugPoints::instance()->is_enable(
+                           
"GroupCommitBlockSink.delay_teardown_remove_load_id")) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+                waited_ms += 10;
+            }
+            LOG(INFO) << "debug delayed teardown remove_load_id, label=" << 
_load_block_queue->label
+                      << ", load_count=" << 
_load_block_queue->group_commit_load_count.load()
+                      << ", waited_ms=" << waited_ms;
+        });
         _remove_estimated_wal_bytes();
-        [[maybe_unused]] auto st = _load_block_queue->remove_load_id(
-                _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
+        if (!_load_id_removed) {
+            [[maybe_unused]] auto st = _load_block_queue->remove_load_id(
+                    _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
+        }
     } else {
         _state->exec_env()->group_commit_mgr()->remove_load_id(
                 _parent->cast<GroupCommitBlockSinkOperatorX>()._table_id,
@@ -223,6 +241,7 @@ Status 
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
             if (_load_block_queue) {
                 _remove_estimated_wal_bytes();
                 [[maybe_unused]] auto st = 
_load_block_queue->remove_load_id(p._load_id);
+                _load_id_removed = true;
             }
             if 
(ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
                         std ::chrono ::seconds(60)) == std ::future_status 
::ready) {
@@ -312,6 +331,7 @@ Status 
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
             }
             local_state._remove_estimated_wal_bytes();
             [[maybe_unused]] auto st = 
local_state._load_block_queue->remove_load_id(_load_id);
+            local_state._load_id_removed = true;
         }
         return Status::OK();
     };
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.h 
b/be/src/exec/operator/group_commit_block_sink_operator.h
index 854f2f3cc1d..e8469d29f2b 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.h
+++ b/be/src/exec/operator/group_commit_block_sink_operator.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <atomic>
+
 #include "exec/operator/operator.h"
 #include "load/group_commit/group_commit_mgr.h"
 #include "storage/tablet_info.h"
@@ -68,6 +70,8 @@ private:
     // used to calculate if meet the max filter ratio
     std::vector<std::shared_ptr<Block>> _blocks;
     bool _is_block_appended = false;
+    // True once this load's load_id has already been removed from the queue
+    std::atomic<bool> _load_id_removed = false;
     // used for find_partition
     std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
     // reuse for find_tablet.
diff --git a/be/src/load/group_commit/group_commit_mgr.cpp 
b/be/src/load/group_commit/group_commit_mgr.cpp
index 976c6dceb02..238901888b6 100644
--- a/be/src/load/group_commit/group_commit_mgr.cpp
+++ b/be/src/load/group_commit/group_commit_mgr.cpp
@@ -51,6 +51,21 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state, std::shared_ptr<Bl
                                  bool write_wal, UniqueId& load_id) {
     DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed",
                     { return 
Status::InternalError("LoadBlockQueue.add_block.failed"); });
+    DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block_reuse_second", {
+        int seq = _debug_add_block_seq.fetch_add(1);
+        if (seq >= 1) {
+            LOG(INFO) << "debug hold reuse 2nd+ add_block, label=" << label
+                      << ", load_id=" << load_id.to_string();
+            int64_t waited_ms = 0;
+            while (waited_ms < 10000 && DebugPoints::instance()->is_enable(
+                                                
"LoadBlockQueue.add_block.block_reuse_second")) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+                waited_ms += 10;
+            }
+            LOG(INFO) << "debug release reuse 2nd+ add_block, label=" << label
+                      << ", load_id=" << load_id.to_string() << ", waited_ms=" 
<< waited_ms;
+        }
+    });
     std::unique_lock l(mutex);
     RETURN_IF_ERROR(status);
     if (UNLIKELY(runtime_state->is_cancelled())) {
diff --git a/be/src/load/group_commit/group_commit_mgr.h 
b/be/src/load/group_commit/group_commit_mgr.h
index 5a79f3e1ac5..bf892a4302c 100644
--- a/be/src/load/group_commit/group_commit_mgr.h
+++ b/be/src/load/group_commit/group_commit_mgr.h
@@ -111,6 +111,10 @@ public:
     // counts of load in one group commit
     std::atomic_size_t group_commit_load_count = 0;
 
+    // only used by fault injection (debug point) to reproduce that multiple 
loads
+    // reuse one group commit plan and share a load_id
+    std::atomic<int> _debug_add_block_seq = 0;
+
     // the execute status of this internal group commit
     std::mutex mutex;
     std::atomic<bool> process_finish = false;
diff --git 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_prepare_lost_row.groovy
 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_prepare_lost_row.groovy
new file mode 100644
index 00000000000..b3d84b40bb6
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_prepare_lost_row.groovy
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.mysql.cj.jdbc.StatementImpl
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite("test_group_commit_prepare_lost_row", "nonConcurrent") {
+    def dbName = "regression_test_insert_p0"
+    def table = dbName + ".test_group_commit_prepare_lost_row"
+    def dpDelayRemove = "GroupCommitBlockSink.delay_teardown_remove_load_id"
+    def dpBlockSecond = "LoadBlockQueue.add_block.block_reuse_second"
+    // large enough that a freshly created queue is NOT need_commit before the 
next load joins,
+    // yet small enough that the consumer commits while the 2nd load's 
add_block is held.
+    def intervalMs = 1000
+
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+    sql "drop table if exists ${table}"
+    sql """
+        CREATE TABLE ${table} (
+            `id` int(11) NOT NULL,
+            `name` varchar(50) NULL,
+            `score` int(11) NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`, `name`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "group_commit_interval_ms" = "${intervalMs}",
+            "replication_num" = "1"
+        );
+    """
+
+    def user = context.config.jdbcUser
+    def password = context.config.jdbcPassword
+    // server-side prepared statement + async group commit (so the plan can be 
reused)
+    def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName, false) +
+            "&sessionVariables=group_commit=async_mode"
+    logger.info("jdbc url: " + url)
+
+    def beIps = [:]
+    def bePorts = [:]
+    getBackendIpHttpPort(beIps, bePorts)
+    def beHttpList = []
+    beIps.each { id, ip -> beHttpList.add([ip as String, bePorts[id] as int]) }
+    logger.info("BE http endpoints (pre-captured): " + beHttpList)
+    def disableDpOnAllBEs = { String name ->
+        beHttpList.each { hp ->
+            DebugPoint.disableDebugPoint(hp[0] as String, hp[1] as int, 
NodeType.BE, name)
+        }
+    }
+
+    def insertOne = { stmt, int id, String name, Integer score ->
+        stmt.setInt(1, id)
+        stmt.setString(2, name)
+        if (score == null) {
+            stmt.setNull(3, java.sql.Types.INTEGER)
+        } else {
+            stmt.setInt(3, score)
+        }
+        stmt.addBatch()
+    }
+
+    def serverInfoOf = { stmt ->
+        def results = ((StatementImpl) stmt).results
+        return results != null ? results.getServerInfo() : null
+    }
+
+    connect(user, password, url) {
+        def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, 
?, ?) """
+        assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, 
insert_stmt.class)
+
+        try {
+            // 1. prime the prepared plan: the first execute creates and 
caches the reusable
+            //    serialized plan (and bakes load_id X). Not a reuse yet.
+            insertOne(insert_stmt, 1, "a", 10)
+            insert_stmt.executeBatch()
+            logger.info("id=1 serverInfo: " + serverInfoOf(insert_stmt))
+            // let the id=1 queue commit and tear down before the dance, so 
the next execute
+            // creates a fresh queue rather than joining id=1's.
+            sleep(intervalMs + 2000)
+
+            // arm the debug points only for the id=2 / id=3 dance.
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().enableDebugPointForAllBEs(dpDelayRemove)
+            GetDebugPoint().enableDebugPointForAllBEs(dpBlockSecond)
+
+            // 2. statement A (id=2): reuses the plan -> shares load_id X. 
Creates a fresh queue
+            //    Q, appends its block (1st add_block on Q proceeds), and 
returns. Its async
+            //    teardown remove(X) is now held until a 2nd load joins Q.
+            insertOne(insert_stmt, 2, "b", 20)
+            insert_stmt.executeBatch()
+            def infoA = serverInfoOf(insert_stmt)
+            logger.info("id=2 (A) serverInfo: " + infoA)
+            // the shared load_id only exists when the plan is actually reused
+            assertTrue(infoA != null && 
infoA.contains("reuse_group_commit_plan"),
+                    "id=2 must reuse the group commit plan (shared load_id); 
serverInfo=" + infoA)
+
+            // 3. statement B (id=3): reuses the plan (same load_id X), joins 
Q via add_load_id(X).
+            //    Pre-fix this releases A's held teardown remove(X), which 
erases B's shared-key
+            //    registration. With the fix, A already removed X in 
wind_up(), so teardown skips
+            //    the redundant remove. B then blocks at add_block until we 
release it.
+            insertOne(insert_stmt, 3, "c", 30)
+            def errB = new 
java.util.concurrent.atomic.AtomicReference<String>(null)
+            def tB = Thread.start {
+                try {
+                    insert_stmt.executeBatch()
+                    logger.info("id=3 (B) serverInfo: " + 
serverInfoOf(insert_stmt))
+                } catch (Throwable e) {
+                    errB.set(e.getMessage())
+                    logger.info("id=3 (B) executeBatch threw: " + 
e.getMessage())
+                }
+            }
+
+            // 4. while B is held at add_block: pre-fix A's teardown clobbered 
the shared key, so
+            //    the consumer committed Q (interval elapsed) with only A's 
row.
+            sleep(intervalMs + 2000)
+
+            // 5. release B's add_block: pre-fix it lands on the 
already-committed queue and is
+            //    silently lost; with the fix B's registration stays in Q 
until B finishes, so
+            //    the consumer waits and commits both rows.
+            // disable via raw HTTP (NOT sql) -- the JDBC connection is held 
by tB's blocked
+            // executeBatch, so routing this through `sql "show backends"` 
would deadlock.
+            disableDpOnAllBEs(dpBlockSecond)
+            disableDpOnAllBEs(dpDelayRemove)
+            tB.join()
+            logger.info("id=3 (B) error (if any): " + errB.get())
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            try { insert_stmt.close() } catch (Throwable ignore) {}
+        }
+    }
+
+    // both reused-plan rows must be present. If the lost-row race regresses, 
id=3 is dropped.
+    def rows = null
+    for (int i = 0; i < 20; i++) {
+        sleep(1000)
+        rows = sql "select id, name, score from ${table} where id in (2, 3) 
order by id"
+        logger.info("rows(id in 2,3) = " + rows + ", retry = " + i)
+        if (rows.size() >= 2) break
+    }
+    assertEquals(2, rows.size(),
+            "group commit lost a reused-plan row: expected id=2 and id=3, got 
" + rows)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to