This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0e260a1b328 [fix](group commit) fix lost row when prepared stmt reused
plan shares one load_id (#64362)
0e260a1b328 is described below
commit 0e260a1b328ae67a4be3ab85d1063783d414d70a
Author: meiyi <[email protected]>
AuthorDate: Tue Jun 30 10:30:42 2026 +0800
[fix](group commit) fix lost row when prepared stmt reused plan shares one
load_id (#64362)
1. When several inserts reuse one server-side prepared group commit
plan, they share a single load_id X
(query_id/load_id/fragment_instance_id are baked into the reused
serialized plan).
2. The BE uses X as the per-queue registration key in
LoadBlockQueue::_load_ids_to_write_dep, and remove_load_id(X) is issued
from two sites: wind_up() (synchronous, before the RPC returns and
correctly ordered by FE serialization) and
~GroupCommitBlockSinkLocalState (async fragment teardown, after the RPC
has returned).
3. FE serialization does not order one load's async teardown remove(X)
against the next load's add_load_id(X), so the teardown can erase the
next load's in-flight registration; the consumer then sees no writer in
flight, commits the queue early, and the next load's add_block lands on
the already-committed queue and is silently lost while FE reports
success.
Fix: track whether wind_up() already removed the load_id
(_load_id_removed) and suppress the redundant remove in the destructor
on the normal path, keeping it only as a fallback for abnormal
(cancel/error) paths where wind_up did not run.
---
.../operator/group_commit_block_sink_operator.cpp | 24 +++-
.../operator/group_commit_block_sink_operator.h | 4 +
be/src/load/group_commit/group_commit_mgr.cpp | 15 ++
be/src/load/group_commit/group_commit_mgr.h | 4 +
.../test_group_commit_prepare_lost_row.groovy | 157 +++++++++++++++++++++
5 files changed, 202 insertions(+), 2 deletions(-)
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.cpp
b/be/src/exec/operator/group_commit_block_sink_operator.cpp
index 38ea6d3c18a..e441882f73b 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.cpp
+++ b/be/src/exec/operator/group_commit_block_sink_operator.cpp
@@ -19,15 +19,33 @@
#include <gen_cpp/DataSinks_types.h>
+#include <chrono>
+#include <thread>
+
#include "exec/sink/vtablet_block_convertor.h"
#include "load/group_commit/group_commit_mgr.h"
+#include "util/debug_points.h"
namespace doris {
GroupCommitBlockSinkLocalState::~GroupCommitBlockSinkLocalState() {
if (_load_block_queue) {
+ DBUG_EXECUTE_IF("GroupCommitBlockSink.delay_teardown_remove_load_id", {
+ int64_t waited_ms = 0;
+ while (_load_block_queue->group_commit_load_count.load() < 2 &&
waited_ms < 10000 &&
+ DebugPoints::instance()->is_enable(
+
"GroupCommitBlockSink.delay_teardown_remove_load_id")) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ waited_ms += 10;
+ }
+ LOG(INFO) << "debug delayed teardown remove_load_id, label=" <<
_load_block_queue->label
+ << ", load_count=" <<
_load_block_queue->group_commit_load_count.load()
+ << ", waited_ms=" << waited_ms;
+ });
_remove_estimated_wal_bytes();
- [[maybe_unused]] auto st = _load_block_queue->remove_load_id(
- _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
+ if (!_load_id_removed) {
+ [[maybe_unused]] auto st = _load_block_queue->remove_load_id(
+ _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
+ }
} else {
_state->exec_env()->group_commit_mgr()->remove_load_id(
_parent->cast<GroupCommitBlockSinkOperatorX>()._table_id,
@@ -223,6 +241,7 @@ Status
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
if (_load_block_queue) {
_remove_estimated_wal_bytes();
[[maybe_unused]] auto st =
_load_block_queue->remove_load_id(p._load_id);
+ _load_id_removed = true;
}
if
(ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
std ::chrono ::seconds(60)) == std ::future_status
::ready) {
@@ -312,6 +331,7 @@ Status
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
}
local_state._remove_estimated_wal_bytes();
[[maybe_unused]] auto st =
local_state._load_block_queue->remove_load_id(_load_id);
+ local_state._load_id_removed = true;
}
return Status::OK();
};
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.h
b/be/src/exec/operator/group_commit_block_sink_operator.h
index 854f2f3cc1d..e8469d29f2b 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.h
+++ b/be/src/exec/operator/group_commit_block_sink_operator.h
@@ -17,6 +17,8 @@
#pragma once
+#include <atomic>
+
#include "exec/operator/operator.h"
#include "load/group_commit/group_commit_mgr.h"
#include "storage/tablet_info.h"
@@ -68,6 +70,8 @@ private:
// used to calculate if meet the max filter ratio
std::vector<std::shared_ptr<Block>> _blocks;
bool _is_block_appended = false;
+ // True once this load's load_id has already been removed from the queue
+ std::atomic<bool> _load_id_removed = false;
// used for find_partition
std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
// reuse for find_tablet.
diff --git a/be/src/load/group_commit/group_commit_mgr.cpp
b/be/src/load/group_commit/group_commit_mgr.cpp
index 976c6dceb02..238901888b6 100644
--- a/be/src/load/group_commit/group_commit_mgr.cpp
+++ b/be/src/load/group_commit/group_commit_mgr.cpp
@@ -51,6 +51,21 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state, std::shared_ptr<Bl
bool write_wal, UniqueId& load_id) {
DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed",
{ return
Status::InternalError("LoadBlockQueue.add_block.failed"); });
+ DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block_reuse_second", {
+ int seq = _debug_add_block_seq.fetch_add(1);
+ if (seq >= 1) {
+ LOG(INFO) << "debug hold reuse 2nd+ add_block, label=" << label
+ << ", load_id=" << load_id.to_string();
+ int64_t waited_ms = 0;
+ while (waited_ms < 10000 && DebugPoints::instance()->is_enable(
+
"LoadBlockQueue.add_block.block_reuse_second")) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ waited_ms += 10;
+ }
+ LOG(INFO) << "debug release reuse 2nd+ add_block, label=" << label
+ << ", load_id=" << load_id.to_string() << ", waited_ms="
<< waited_ms;
+ }
+ });
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
if (UNLIKELY(runtime_state->is_cancelled())) {
diff --git a/be/src/load/group_commit/group_commit_mgr.h
b/be/src/load/group_commit/group_commit_mgr.h
index 5a79f3e1ac5..bf892a4302c 100644
--- a/be/src/load/group_commit/group_commit_mgr.h
+++ b/be/src/load/group_commit/group_commit_mgr.h
@@ -111,6 +111,10 @@ public:
// counts of load in one group commit
std::atomic_size_t group_commit_load_count = 0;
+ // only used by fault injection (debug point) to reproduce that multiple
loads
+ // reuse one group commit plan and share a load_id
+ std::atomic<int> _debug_add_block_seq = 0;
+
// the execute status of this internal group commit
std::mutex mutex;
std::atomic<bool> process_finish = false;
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_prepare_lost_row.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_prepare_lost_row.groovy
new file mode 100644
index 00000000000..b3d84b40bb6
--- /dev/null
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_prepare_lost_row.groovy
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.mysql.cj.jdbc.StatementImpl
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite("test_group_commit_prepare_lost_row", "nonConcurrent") {
+ def dbName = "regression_test_insert_p0"
+ def table = dbName + ".test_group_commit_prepare_lost_row"
+ def dpDelayRemove = "GroupCommitBlockSink.delay_teardown_remove_load_id"
+ def dpBlockSecond = "LoadBlockQueue.add_block.block_reuse_second"
+ // large enough that a freshly created queue is NOT need_commit before the
next load joins,
+ // yet small enough that the consumer commits while the 2nd load's
add_block is held.
+ def intervalMs = 1000
+
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "drop table if exists ${table}"
+ sql """
+ CREATE TABLE ${table} (
+ `id` int(11) NOT NULL,
+ `name` varchar(50) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "group_commit_interval_ms" = "${intervalMs}",
+ "replication_num" = "1"
+ );
+ """
+
+ def user = context.config.jdbcUser
+ def password = context.config.jdbcPassword
+ // server-side prepared statement + async group commit (so the plan can be
reused)
+ def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName, false) +
+ "&sessionVariables=group_commit=async_mode"
+ logger.info("jdbc url: " + url)
+
+ def beIps = [:]
+ def bePorts = [:]
+ getBackendIpHttpPort(beIps, bePorts)
+ def beHttpList = []
+ beIps.each { id, ip -> beHttpList.add([ip as String, bePorts[id] as int]) }
+ logger.info("BE http endpoints (pre-captured): " + beHttpList)
+ def disableDpOnAllBEs = { String name ->
+ beHttpList.each { hp ->
+ DebugPoint.disableDebugPoint(hp[0] as String, hp[1] as int,
NodeType.BE, name)
+ }
+ }
+
+ def insertOne = { stmt, int id, String name, Integer score ->
+ stmt.setInt(1, id)
+ stmt.setString(2, name)
+ if (score == null) {
+ stmt.setNull(3, java.sql.Types.INTEGER)
+ } else {
+ stmt.setInt(3, score)
+ }
+ stmt.addBatch()
+ }
+
+ def serverInfoOf = { stmt ->
+ def results = ((StatementImpl) stmt).results
+ return results != null ? results.getServerInfo() : null
+ }
+
+ connect(user, password, url) {
+ def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?,
?, ?) """
+ assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement,
insert_stmt.class)
+
+ try {
+ // 1. prime the prepared plan: the first execute creates and
caches the reusable
+ // serialized plan (and bakes load_id X). Not a reuse yet.
+ insertOne(insert_stmt, 1, "a", 10)
+ insert_stmt.executeBatch()
+ logger.info("id=1 serverInfo: " + serverInfoOf(insert_stmt))
+ // let the id=1 queue commit and tear down before the dance, so
the next execute
+ // creates a fresh queue rather than joining id=1's.
+ sleep(intervalMs + 2000)
+
+ // arm the debug points only for the id=2 / id=3 dance.
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().enableDebugPointForAllBEs(dpDelayRemove)
+ GetDebugPoint().enableDebugPointForAllBEs(dpBlockSecond)
+
+ // 2. statement A (id=2): reuses the plan -> shares load_id X.
Creates a fresh queue
+ // Q, appends its block (1st add_block on Q proceeds), and
returns. Its async
+ // teardown remove(X) is now held until a 2nd load joins Q.
+ insertOne(insert_stmt, 2, "b", 20)
+ insert_stmt.executeBatch()
+ def infoA = serverInfoOf(insert_stmt)
+ logger.info("id=2 (A) serverInfo: " + infoA)
+ // the shared load_id only exists when the plan is actually reused
+ assertTrue(infoA != null &&
infoA.contains("reuse_group_commit_plan"),
+ "id=2 must reuse the group commit plan (shared load_id);
serverInfo=" + infoA)
+
+ // 3. statement B (id=3): reuses the plan (same load_id X), joins
Q via add_load_id(X).
+ // Pre-fix this releases A's held teardown remove(X), which
erases B's shared-key
+ // registration. With the fix, A already removed X in
wind_up(), so teardown skips
+ // the redundant remove. B then blocks at add_block until we
release it.
+ insertOne(insert_stmt, 3, "c", 30)
+ def errB = new
java.util.concurrent.atomic.AtomicReference<String>(null)
+ def tB = Thread.start {
+ try {
+ insert_stmt.executeBatch()
+ logger.info("id=3 (B) serverInfo: " +
serverInfoOf(insert_stmt))
+ } catch (Throwable e) {
+ errB.set(e.getMessage())
+ logger.info("id=3 (B) executeBatch threw: " +
e.getMessage())
+ }
+ }
+
+ // 4. while B is held at add_block: pre-fix A's teardown clobbered
the shared key, so
+ // the consumer committed Q (interval elapsed) with only A's
row.
+ sleep(intervalMs + 2000)
+
+ // 5. release B's add_block: pre-fix it lands on the
already-committed queue and is
+ // silently lost; with the fix B's registration stays in Q
until B finishes, so
+ // the consumer waits and commits both rows.
+ // disable via raw HTTP (NOT sql) -- the JDBC connection is held
by tB's blocked
+ // executeBatch, so routing this through `sql "show backends"`
would deadlock.
+ disableDpOnAllBEs(dpBlockSecond)
+ disableDpOnAllBEs(dpDelayRemove)
+ tB.join()
+ logger.info("id=3 (B) error (if any): " + errB.get())
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ try { insert_stmt.close() } catch (Throwable ignore) {}
+ }
+ }
+
+ // both reused-plan rows must be present. If the lost-row race regresses,
id=3 is dropped.
+ def rows = null
+ for (int i = 0; i < 20; i++) {
+ sleep(1000)
+ rows = sql "select id, name, score from ${table} where id in (2, 3)
order by id"
+ logger.info("rows(id in 2,3) = " + rows + ", retry = " + i)
+ if (rows.size() >= 2) break
+ }
+ assertEquals(2, rows.size(),
+ "group commit lost a reused-plan row: expected id=2 and id=3, got
" + rows)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]