This is an automated email from the ASF dual-hosted git repository.

luwei16 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1526741ebc8 [fix](cloud) Refresh base tablet before schema change V1 
(#64312)
1526741ebc8 is described below

commit 1526741ebc849f18dd8a2ede9c44afe88201f54a
Author: Jimmy <[email protected]>
AuthorDate: Thu Jun 11 19:55:52 2026 +0800

    [fix](cloud) Refresh base tablet before schema change V1 (#64312)
    
    Related PR: #62272
    
    Problem Summary: Cloud schema change retries could keep registering a
    stale alter version after a cross-V1 compaction conflict. The retry path
    synced the base tablet only up to the request alter version, so a local
    base tablet whose max version already exceeded that request could skip
    refreshing newer visible rowsets. When a new schema-change tablet had a
    compaction rowset crossing the stale V1, every retry reused that V1 and
    eventually exhausted the retry limit.
    
    This change refreshes the base tablet without capping sync by the
    request alter version before computing schema change V1, allowing
    retries to register a fresh boundary. It also adds a docker regression
    case that blocks schema change, creates a cross-V1 retry, injects stale
    local max-version state for query-version sync, and verifies the retry
    finishes after the base tablet is refreshed.
    
    ### Release note
    
    Fix cloud schema change retry stale V1 failure.
---
 be/src/cloud/cloud_schema_change_job.cpp           |   3 +-
 be/src/cloud/cloud_tablet.cpp                      |  12 ++
 be/test/cloud/cloud_schema_change_job_test.cpp     |  78 ++++++++++
 ...c_compaction_cross_v1_stale_base_refresh.groovy | 158 +++++++++++++++++++++
 4 files changed, 250 insertions(+), 1 deletion(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 9b0d0fcc379..d6932a922da 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -106,7 +106,8 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
     }
     // MUST sync rowsets before capturing rowset readers and building 
DeleteHandler
     SyncOptions options;
-    options.query_version = request.alter_version;
+    // The SC boundary (V1) must be calculated from the latest visible rowsets 
of the base
+    // tablet. Do not cap this sync by request.alter_version, which may be 
stale across retries.
     RETURN_IF_ERROR(_base_tablet->sync_rowsets(options));
     // ATTN: Only convert rowsets of version larger than 1, MUST let the new 
tablet cache have rowset [0-1]
     _output_cumulative_point = _base_tablet->cumulative_layer_point();
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 52489d1448c..3522871f557 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -288,6 +288,18 @@ Status CloudTablet::sync_rowsets(const SyncOptions& 
options, SyncRowsetStats* st
     RETURN_IF_ERROR(sync_if_not_running(stats));
 
     if (options.query_version > 0) {
+        
DBUG_EXECUTE_IF("CloudTablet::sync_rowsets.stale_local_max_for_query_version", {
+            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+            auto stale_version = dp->param<int64_t>("version", -1);
+            if (target_tablet_id == tablet_id() && stale_version >= 0) {
+                std::unique_lock wlock(_meta_lock);
+                LOG(INFO) << "override cloud tablet local max_version for 
query_version sync"
+                          << ", tablet_id=" << tablet_id() << ", 
old_max_version=" << _max_version
+                          << ", stale_version=" << stale_version
+                          << ", query_version=" << options.query_version;
+                _max_version = stale_version;
+            }
+        });
         auto lock_start = std::chrono::steady_clock::now();
         std::shared_lock rlock(_meta_lock);
         if (stats) {
diff --git a/be/test/cloud/cloud_schema_change_job_test.cpp 
b/be/test/cloud/cloud_schema_change_job_test.cpp
index 972ff2af255..74b27bed7ad 100644
--- a/be/test/cloud/cloud_schema_change_job_test.cpp
+++ b/be/test/cloud/cloud_schema_change_job_test.cpp
@@ -205,6 +205,84 @@ TEST_F(CloudSchemaChangeJobTest, 
FillVersionHolesBeforeNewTabletRunning) {
     ASSERT_EQ(versions[1], Version(4, 4));
 }
 
+TEST_F(CloudSchemaChangeJobTest, 
RefreshBaseTabletBeforeRegisteringSchemaChangeJob) {
+    int64_t base_tablet_id = 50001;
+    int64_t new_tablet_id = 50002;
+
+    TabletMetaSharedPtr base_meta(new TabletMeta(
+            1, 2, base_tablet_id, base_tablet_id + 100, 4, 5, TTabletSchema(), 
6, {{7, 8}},
+            UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    TabletMetaSharedPtr new_meta(new TabletMeta(
+            1, 2, new_tablet_id, new_tablet_id + 100, 4, 5, TTabletSchema(), 
6, {{7, 8}},
+            UniqueId(11, 12), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+
+    auto base_tablet = std::make_shared<CloudTablet>(_engine, 
std::move(base_meta));
+    auto new_tablet = std::make_shared<CloudTablet>(_engine, 
std::move(new_meta));
+    static_cast<void>(new_tablet->set_tablet_state(TABLET_NOTREADY));
+
+    auto base_initial_rowset = create_rowset(base_tablet->tablet_schema(), 
base_tablet_id, 2, 6);
+    auto base_latest_rowset = create_rowset(base_tablet->tablet_schema(), 
base_tablet_id, 7, 10);
+
+    auto* sp = SyncPoint::get_instance();
+    sp->clear_all_call_backs();
+    sp->enable_processing();
+
+    sp->set_call_back("CloudMetaMgr::get_tablet_meta", [&](auto&& args) {
+        auto tablet_id = try_any_cast<int64_t>(args[0]);
+        auto* meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
+        if (tablet_id == base_tablet_id) {
+            *meta_ptr = base_tablet->tablet_meta();
+        } else if (tablet_id == new_tablet_id) {
+            *meta_ptr = new_tablet->tablet_meta();
+        }
+        try_any_cast_ret<Status>(args)->second = true;
+    });
+
+    int base_sync_count = 0;
+    sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [&](auto&& outcome) 
{
+        auto* tablet = try_any_cast<CloudTablet*>(outcome[0]);
+        if (tablet->tablet_id() == base_tablet_id) {
+            ++base_sync_count;
+            std::unique_lock lock(tablet->get_header_lock());
+            if (base_sync_count == 1) {
+                tablet->add_rowsets({base_initial_rowset}, false, lock);
+            } else {
+                tablet->add_rowsets({base_latest_rowset}, false, lock);
+            }
+        }
+        auto* pairs = try_any_cast_ret<Status>(outcome);
+        pairs->second = true;
+        pairs->first = Status::OK();
+    });
+
+    int64_t prepared_alter_version = -1;
+    sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [&](auto&& outcome) {
+        auto job = try_any_cast<cloud::TabletJobInfoPB>(outcome[0]);
+        ASSERT_TRUE(job.has_schema_change());
+        prepared_alter_version = job.schema_change().alter_version();
+
+        auto* pairs = try_any_cast_ret<Status>(outcome);
+        pairs->second = true;
+        pairs->first = Status::InternalError("mock job already success");
+
+        auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
+        resp->mutable_status()->set_code(cloud::JOB_ALREADY_SUCCESS);
+    });
+
+    TAlterTabletReqV2 request;
+    request.base_tablet_id = base_tablet_id;
+    request.new_tablet_id = new_tablet_id;
+    request.alter_version = 4;
+    request.__set_alter_tablet_type(TAlterTabletType::SCHEMA_CHANGE);
+
+    CloudSchemaChangeJob sc_job(_engine, "test_refresh_base_tablet_before_sc", 
9999999999);
+    auto status = sc_job.process_alter_tablet(request);
+
+    ASSERT_TRUE(status.ok()) << status.to_string();
+    ASSERT_EQ(base_sync_count, 2);
+    ASSERT_EQ(prepared_alter_version, 10);
+}
+
 // Test: cross-V1 compaction detected → abort SC job → return 
SC_COMPACTION_CONFLICT
 TEST_F(CloudSchemaChangeJobTest, CrossV1CompactionDetected) {
     int64_t base_tablet_id = 10001;
diff --git 
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
new file mode 100644
index 00000000000..679385562b5
--- /dev/null
+++ 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: schema change retry refreshes base tablet rowsets before registering 
a fresh V1.
+//
+// Timeline:
+//   1. Block SC, insert data, and let new tablet compaction create a cross-V1 
rowset.
+//   2. Force V1=6 once to prove the cross-V1 retry path is active.
+//   3. Disable the V1 override, but inject stale local base max=6 only for 
query_version sync.
+//   4. Retry must refresh the base tablet and finish with the latest V1.
+//
+// The stale-local-max debug point only fires when the caller uses 
SyncOptions.query_version.
+// Old code capped SC base sync by request.alter_version, so it keeps V1 stale 
and cannot finish.
+// The fixed path does not set query_version, so it refreshes from 
meta-service and succeeds.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_cross_v1_stale_base_refresh', 'docker') {
+
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.enableDebugPoints()
+    options.beConfigs += ["enable_java_support=false"]
+    options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+    options.beConfigs += ["alter_tablet_worker_count=1"]
+    options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+    options.beNum = 1
+    options.feConfigs += ["http_port=8030"]
+    options.feConfigs += ["rpc_port=9020"]
+    options.feConfigs += ["query_port=9030"]
+    options.feConfigs += ["edit_log_port=9010"]
+    options.feConfigs += ["enable_schema_change_retry=true"]
+    options.feConfigs += ["schema_change_max_retry_time=10"]
+
+    docker(options) {
+        def getJobState = {
+            def result = sql """
+                SHOW ALTER TABLE COLUMN
+                WHERE IndexName='sc_cross_v1_stale_base_refresh_test'
+                ORDER BY createtime DESC LIMIT 1
+            """
+            logger.info("getJobState: ${result}")
+            return result[0][9]
+        }
+
+        sql "DROP TABLE IF EXISTS sc_cross_v1_stale_base_refresh_test"
+        sql """
+            CREATE TABLE sc_cross_v1_stale_base_refresh_test (
+                k1 int NOT NULL,
+                v1 varchar(100) NOT NULL,
+                v2 int NOT NULL
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1"
+            )
+        """
+
+        def tablets = sql_return_maparray "SHOW TABLETS FROM 
sc_cross_v1_stale_base_refresh_test"
+        assertEquals(1, tablets.size())
+        def baseTabletId = tablets[0].TabletId.toString()
+        logger.info("base tablet id for stale refresh test: ${baseTabletId}")
+
+        for (int i = 0; i < 3; i++) {
+            StringBuilder sb = new StringBuilder()
+            sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test VALUES 
")
+            for (int j = 0; j < 20; j++) {
+                if (j > 0) {
+                    sb.append(", ")
+                }
+                def key = i * 20 + j + 1
+                sb.append("(${key}, 'val_${key}', ${key * 10})")
+            }
+            sql sb.toString()
+        }
+        assertEquals(60L, (sql "SELECT count(*) FROM 
sc_cross_v1_stale_base_refresh_test")[0][0])
+
+        def scBlock = 'CloudSchemaChangeJob::process_alter_tablet.block'
+        def overrideDP = 
'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version'
+        def staleMaxDP = 
'CloudTablet::sync_rowsets.stale_local_max_for_query_version'
+
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs(scBlock)
+            try {
+                sql "ALTER TABLE sc_cross_v1_stale_base_refresh_test MODIFY 
COLUMN v2 bigint"
+                sleep(10000)
+                assertEquals("RUNNING", getJobState())
+
+                for (int i = 0; i < 6; i++) {
+                    StringBuilder sb = new StringBuilder()
+                    sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test 
VALUES ")
+                    for (int j = 0; j < 10; j++) {
+                        if (j > 0) {
+                            sb.append(", ")
+                        }
+                        def key = 100 + i * 10 + j + 1
+                        sb.append("(${key}, 'new_${key}', ${key * 10})")
+                    }
+                    sql sb.toString()
+                }
+
+                sleep(30000)
+                GetDebugPoint().enableDebugPointForAllBEs(overrideDP, 
[version: 6])
+            } finally {
+                GetDebugPoint().disableDebugPointForAllBEs(scBlock)
+            }
+
+            sleep(10000)
+            assertEquals("RUNNING", getJobState(),
+                    "SC should still be RUNNING while V1 is forced to cross 
the compacted rowset")
+
+            GetDebugPoint().enableDebugPointForAllBEs(
+                    staleMaxDP, [tablet_id: baseTabletId, version: 6])
+            GetDebugPoint().disableDebugPointForAllBEs(overrideDP)
+
+            int maxTries = 180
+            def finalState = ""
+            while (maxTries-- > 0) {
+                finalState = getJobState()
+                if (finalState == "FINISHED" || finalState == "CANCELLED") {
+                    break
+                }
+                sleep(1000)
+            }
+
+            logger.info("SC final state after stale base refresh retry: 
${finalState}")
+            assertEquals("FINISHED", finalState)
+
+            assertEquals(120L,
+                    (sql "SELECT count(*) FROM 
sc_cross_v1_stale_base_refresh_test")[0][0])
+
+            def columns = sql "DESC sc_cross_v1_stale_base_refresh_test"
+            def v2Col = columns.find { it[0] == "v2" }
+            assertTrue(v2Col[1].toString().toLowerCase().contains("bigint"),
+                    "v2 column should be bigint after schema change, got: 
${v2Col[1]}")
+
+            def backends = sql_return_maparray("SHOW BACKENDS")
+            assertTrue(backends.every { it.Alive.toString() == "true" },
+                    "BE should be alive after stale base refresh retry")
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to