This is an automated email from the ASF dual-hosted git repository.
luwei16 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1526741ebc8 [fix](cloud) Refresh base tablet before schema change V1
(#64312)
1526741ebc8 is described below
commit 1526741ebc849f18dd8a2ede9c44afe88201f54a
Author: Jimmy <[email protected]>
AuthorDate: Thu Jun 11 19:55:52 2026 +0800
[fix](cloud) Refresh base tablet before schema change V1 (#64312)
Related PR: #62272
Problem Summary: Cloud schema change retries could keep registering a
stale alter version after a cross-V1 compaction conflict. The retry path
synced the base tablet only up to the request alter version, so a local
base tablet whose max version already exceeded that request could skip
refreshing newer visible rowsets. When a new schema-change tablet had a
compaction rowset crossing the stale V1, every retry reused that V1 and
eventually exhausted the retry limit.
This change refreshes the base tablet without capping sync by the
request alter version before computing schema change V1, allowing
retries to register a fresh boundary. It also adds a docker regression
case that blocks schema change, creates a cross-V1 retry, injects stale
local max-version state for query-version sync, and verifies the retry
finishes after the base tablet is refreshed.
### Release note
Fix cloud schema change retry stale V1 failure.
---
be/src/cloud/cloud_schema_change_job.cpp | 3 +-
be/src/cloud/cloud_tablet.cpp | 12 ++
be/test/cloud/cloud_schema_change_job_test.cpp | 78 ++++++++++
...c_compaction_cross_v1_stale_base_refresh.groovy | 158 +++++++++++++++++++++
4 files changed, 250 insertions(+), 1 deletion(-)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 9b0d0fcc379..d6932a922da 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -106,7 +106,8 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
}
// MUST sync rowsets before capturing rowset readers and building
DeleteHandler
SyncOptions options;
- options.query_version = request.alter_version;
+ // The SC boundary (V1) must be calculated from the latest visible rowsets
of the base
+ // tablet. Do not cap this sync by request.alter_version, which may be
stale across retries.
RETURN_IF_ERROR(_base_tablet->sync_rowsets(options));
// ATTN: Only convert rowsets of version larger than 1, MUST let the new
tablet cache have rowset [0-1]
_output_cumulative_point = _base_tablet->cumulative_layer_point();
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 52489d1448c..3522871f557 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -288,6 +288,18 @@ Status CloudTablet::sync_rowsets(const SyncOptions&
options, SyncRowsetStats* st
RETURN_IF_ERROR(sync_if_not_running(stats));
if (options.query_version > 0) {
+
DBUG_EXECUTE_IF("CloudTablet::sync_rowsets.stale_local_max_for_query_version", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ auto stale_version = dp->param<int64_t>("version", -1);
+ if (target_tablet_id == tablet_id() && stale_version >= 0) {
+ std::unique_lock wlock(_meta_lock);
+ LOG(INFO) << "override cloud tablet local max_version for
query_version sync"
+ << ", tablet_id=" << tablet_id() << ",
old_max_version=" << _max_version
+ << ", stale_version=" << stale_version
+ << ", query_version=" << options.query_version;
+ _max_version = stale_version;
+ }
+ });
auto lock_start = std::chrono::steady_clock::now();
std::shared_lock rlock(_meta_lock);
if (stats) {
diff --git a/be/test/cloud/cloud_schema_change_job_test.cpp
b/be/test/cloud/cloud_schema_change_job_test.cpp
index 972ff2af255..74b27bed7ad 100644
--- a/be/test/cloud/cloud_schema_change_job_test.cpp
+++ b/be/test/cloud/cloud_schema_change_job_test.cpp
@@ -205,6 +205,84 @@ TEST_F(CloudSchemaChangeJobTest,
FillVersionHolesBeforeNewTabletRunning) {
ASSERT_EQ(versions[1], Version(4, 4));
}
+TEST_F(CloudSchemaChangeJobTest,
RefreshBaseTabletBeforeRegisteringSchemaChangeJob) {
+ int64_t base_tablet_id = 50001;
+ int64_t new_tablet_id = 50002;
+
+ TabletMetaSharedPtr base_meta(new TabletMeta(
+ 1, 2, base_tablet_id, base_tablet_id + 100, 4, 5, TTabletSchema(),
6, {{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ TabletMetaSharedPtr new_meta(new TabletMeta(
+ 1, 2, new_tablet_id, new_tablet_id + 100, 4, 5, TTabletSchema(),
6, {{7, 8}},
+ UniqueId(11, 12), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+
+ auto base_tablet = std::make_shared<CloudTablet>(_engine,
std::move(base_meta));
+ auto new_tablet = std::make_shared<CloudTablet>(_engine,
std::move(new_meta));
+ static_cast<void>(new_tablet->set_tablet_state(TABLET_NOTREADY));
+
+ auto base_initial_rowset = create_rowset(base_tablet->tablet_schema(),
base_tablet_id, 2, 6);
+ auto base_latest_rowset = create_rowset(base_tablet->tablet_schema(),
base_tablet_id, 7, 10);
+
+ auto* sp = SyncPoint::get_instance();
+ sp->clear_all_call_backs();
+ sp->enable_processing();
+
+ sp->set_call_back("CloudMetaMgr::get_tablet_meta", [&](auto&& args) {
+ auto tablet_id = try_any_cast<int64_t>(args[0]);
+ auto* meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
+ if (tablet_id == base_tablet_id) {
+ *meta_ptr = base_tablet->tablet_meta();
+ } else if (tablet_id == new_tablet_id) {
+ *meta_ptr = new_tablet->tablet_meta();
+ }
+ try_any_cast_ret<Status>(args)->second = true;
+ });
+
+ int base_sync_count = 0;
+ sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [&](auto&& outcome)
{
+ auto* tablet = try_any_cast<CloudTablet*>(outcome[0]);
+ if (tablet->tablet_id() == base_tablet_id) {
+ ++base_sync_count;
+ std::unique_lock lock(tablet->get_header_lock());
+ if (base_sync_count == 1) {
+ tablet->add_rowsets({base_initial_rowset}, false, lock);
+ } else {
+ tablet->add_rowsets({base_latest_rowset}, false, lock);
+ }
+ }
+ auto* pairs = try_any_cast_ret<Status>(outcome);
+ pairs->second = true;
+ pairs->first = Status::OK();
+ });
+
+ int64_t prepared_alter_version = -1;
+ sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [&](auto&& outcome) {
+ auto job = try_any_cast<cloud::TabletJobInfoPB>(outcome[0]);
+ ASSERT_TRUE(job.has_schema_change());
+ prepared_alter_version = job.schema_change().alter_version();
+
+ auto* pairs = try_any_cast_ret<Status>(outcome);
+ pairs->second = true;
+ pairs->first = Status::InternalError("mock job already success");
+
+ auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
+ resp->mutable_status()->set_code(cloud::JOB_ALREADY_SUCCESS);
+ });
+
+ TAlterTabletReqV2 request;
+ request.base_tablet_id = base_tablet_id;
+ request.new_tablet_id = new_tablet_id;
+ request.alter_version = 4;
+ request.__set_alter_tablet_type(TAlterTabletType::SCHEMA_CHANGE);
+
+ CloudSchemaChangeJob sc_job(_engine, "test_refresh_base_tablet_before_sc",
9999999999);
+ auto status = sc_job.process_alter_tablet(request);
+
+ ASSERT_TRUE(status.ok()) << status.to_string();
+ ASSERT_EQ(base_sync_count, 2);
+ ASSERT_EQ(prepared_alter_version, 10);
+}
+
// Test: cross-V1 compaction detected → abort SC job → return
SC_COMPACTION_CONFLICT
TEST_F(CloudSchemaChangeJobTest, CrossV1CompactionDetected) {
int64_t base_tablet_id = 10001;
diff --git
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
new file mode 100644
index 00000000000..679385562b5
--- /dev/null
+++
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: schema change retry refreshes base tablet rowsets before registering
a fresh V1.
+//
+// Timeline:
+// 1. Block SC, insert data, and let new tablet compaction create a cross-V1
rowset.
+// 2. Force V1=6 once to prove the cross-V1 retry path is active.
+// 3. Disable the V1 override, but inject stale local base max=6 only for
query_version sync.
+// 4. Retry must refresh the base tablet and finish with the latest V1.
+//
+// The stale-local-max debug point only fires when the caller uses
SyncOptions.query_version.
+// Old code capped SC base sync by request.alter_version, so it keeps V1 stale
and cannot finish.
+// The fixed path does not set query_version, so it refreshes from
meta-service and succeeds.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_cross_v1_stale_base_refresh', 'docker') {
+
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.enableDebugPoints()
+ options.beConfigs += ["enable_java_support=false"]
+ options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+ options.beConfigs += ["alter_tablet_worker_count=1"]
+ options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+ options.beNum = 1
+ options.feConfigs += ["http_port=8030"]
+ options.feConfigs += ["rpc_port=9020"]
+ options.feConfigs += ["query_port=9030"]
+ options.feConfigs += ["edit_log_port=9010"]
+ options.feConfigs += ["enable_schema_change_retry=true"]
+ options.feConfigs += ["schema_change_max_retry_time=10"]
+
+ docker(options) {
+ def getJobState = {
+ def result = sql """
+ SHOW ALTER TABLE COLUMN
+ WHERE IndexName='sc_cross_v1_stale_base_refresh_test'
+ ORDER BY createtime DESC LIMIT 1
+ """
+ logger.info("getJobState: ${result}")
+ return result[0][9]
+ }
+
+ sql "DROP TABLE IF EXISTS sc_cross_v1_stale_base_refresh_test"
+ sql """
+ CREATE TABLE sc_cross_v1_stale_base_refresh_test (
+ k1 int NOT NULL,
+ v1 varchar(100) NOT NULL,
+ v2 int NOT NULL
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ def tablets = sql_return_maparray "SHOW TABLETS FROM
sc_cross_v1_stale_base_refresh_test"
+ assertEquals(1, tablets.size())
+ def baseTabletId = tablets[0].TabletId.toString()
+ logger.info("base tablet id for stale refresh test: ${baseTabletId}")
+
+ for (int i = 0; i < 3; i++) {
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test VALUES
")
+ for (int j = 0; j < 20; j++) {
+ if (j > 0) {
+ sb.append(", ")
+ }
+ def key = i * 20 + j + 1
+ sb.append("(${key}, 'val_${key}', ${key * 10})")
+ }
+ sql sb.toString()
+ }
+ assertEquals(60L, (sql "SELECT count(*) FROM
sc_cross_v1_stale_base_refresh_test")[0][0])
+
+ def scBlock = 'CloudSchemaChangeJob::process_alter_tablet.block'
+ def overrideDP =
'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version'
+ def staleMaxDP =
'CloudTablet::sync_rowsets.stale_local_max_for_query_version'
+
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(scBlock)
+ try {
+ sql "ALTER TABLE sc_cross_v1_stale_base_refresh_test MODIFY
COLUMN v2 bigint"
+ sleep(10000)
+ assertEquals("RUNNING", getJobState())
+
+ for (int i = 0; i < 6; i++) {
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test
VALUES ")
+ for (int j = 0; j < 10; j++) {
+ if (j > 0) {
+ sb.append(", ")
+ }
+ def key = 100 + i * 10 + j + 1
+ sb.append("(${key}, 'new_${key}', ${key * 10})")
+ }
+ sql sb.toString()
+ }
+
+ sleep(30000)
+ GetDebugPoint().enableDebugPointForAllBEs(overrideDP,
[version: 6])
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(scBlock)
+ }
+
+ sleep(10000)
+ assertEquals("RUNNING", getJobState(),
+ "SC should still be RUNNING while V1 is forced to cross
the compacted rowset")
+
+ GetDebugPoint().enableDebugPointForAllBEs(
+ staleMaxDP, [tablet_id: baseTabletId, version: 6])
+ GetDebugPoint().disableDebugPointForAllBEs(overrideDP)
+
+ int maxTries = 180
+ def finalState = ""
+ while (maxTries-- > 0) {
+ finalState = getJobState()
+ if (finalState == "FINISHED" || finalState == "CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ logger.info("SC final state after stale base refresh retry:
${finalState}")
+ assertEquals("FINISHED", finalState)
+
+ assertEquals(120L,
+ (sql "SELECT count(*) FROM
sc_cross_v1_stale_base_refresh_test")[0][0])
+
+ def columns = sql "DESC sc_cross_v1_stale_base_refresh_test"
+ def v2Col = columns.find { it[0] == "v2" }
+ assertTrue(v2Col[1].toString().toLowerCase().contains("bigint"),
+ "v2 column should be bigint after schema change, got:
${v2Col[1]}")
+
+ def backends = sql_return_maparray("SHOW BACKENDS")
+ assertTrue(backends.every { it.Alive.toString() == "true" },
+ "BE should be alive after stale base refresh retry")
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]