This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new b6f61e99082 branch-4.1: [fix](cloud) Refresh base tablet before schema 
change V1 #64312 (#64430)
b6f61e99082 is described below

commit b6f61e99082e0b66036924a060f039c8ca8fa8f2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 16 16:06:18 2026 +0800

    branch-4.1: [fix](cloud) Refresh base tablet before schema change V1 #64312 
(#64430)
    
    Cherry-picked from #64312
    
    Co-authored-by: Jimmy <[email protected]>
---
 be/src/cloud/cloud_schema_change_job.cpp           |   3 +-
 be/src/cloud/cloud_tablet.cpp                      |  12 ++
 be/test/cloud/cloud_schema_change_job_test.cpp     |  78 ++++++++++
 ...c_compaction_cross_v1_stale_base_refresh.groovy | 158 +++++++++++++++++++++
 4 files changed, 250 insertions(+), 1 deletion(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index e6b83a1d5fe..87d11264d10 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -105,7 +105,8 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
     }
     // MUST sync rowsets before capturing rowset readers and building 
DeleteHandler
     SyncOptions options;
-    options.query_version = request.alter_version;
+    // The SC boundary (V1) must be calculated from the latest visible rowsets 
of the base
+    // tablet. Do not cap this sync by request.alter_version, which may be 
stale across retries.
     RETURN_IF_ERROR(_base_tablet->sync_rowsets(options));
     // ATTN: Only convert rowsets of version larger than 1, MUST let the new 
tablet cache have rowset [0-1]
     _output_cumulative_point = _base_tablet->cumulative_layer_point();
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 92ace798789..efd766a4a81 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -289,6 +289,18 @@ Status CloudTablet::sync_rowsets(const SyncOptions& 
options, SyncRowsetStats* st
     RETURN_IF_ERROR(sync_if_not_running(stats));
 
     if (options.query_version > 0) {
+        
DBUG_EXECUTE_IF("CloudTablet::sync_rowsets.stale_local_max_for_query_version", {
+            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+            auto stale_version = dp->param<int64_t>("version", -1);
+            if (target_tablet_id == tablet_id() && stale_version >= 0) {
+                std::unique_lock wlock(_meta_lock);
+                LOG(INFO) << "override cloud tablet local max_version for 
query_version sync"
+                          << ", tablet_id=" << tablet_id() << ", 
old_max_version=" << _max_version
+                          << ", stale_version=" << stale_version
+                          << ", query_version=" << options.query_version;
+                _max_version = stale_version;
+            }
+        });
         auto lock_start = std::chrono::steady_clock::now();
         std::shared_lock rlock(_meta_lock);
         if (stats) {
diff --git a/be/test/cloud/cloud_schema_change_job_test.cpp 
b/be/test/cloud/cloud_schema_change_job_test.cpp
index 972ff2af255..74b27bed7ad 100644
--- a/be/test/cloud/cloud_schema_change_job_test.cpp
+++ b/be/test/cloud/cloud_schema_change_job_test.cpp
@@ -205,6 +205,84 @@ TEST_F(CloudSchemaChangeJobTest, 
FillVersionHolesBeforeNewTabletRunning) {
     ASSERT_EQ(versions[1], Version(4, 4));
 }
 
+TEST_F(CloudSchemaChangeJobTest, 
RefreshBaseTabletBeforeRegisteringSchemaChangeJob) {
+    int64_t base_tablet_id = 50001;
+    int64_t new_tablet_id = 50002;
+
+    TabletMetaSharedPtr base_meta(new TabletMeta(
+            1, 2, base_tablet_id, base_tablet_id + 100, 4, 5, TTabletSchema(), 
6, {{7, 8}},
+            UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    TabletMetaSharedPtr new_meta(new TabletMeta(
+            1, 2, new_tablet_id, new_tablet_id + 100, 4, 5, TTabletSchema(), 
6, {{7, 8}},
+            UniqueId(11, 12), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+
+    auto base_tablet = std::make_shared<CloudTablet>(_engine, 
std::move(base_meta));
+    auto new_tablet = std::make_shared<CloudTablet>(_engine, 
std::move(new_meta));
+    static_cast<void>(new_tablet->set_tablet_state(TABLET_NOTREADY));
+
+    auto base_initial_rowset = create_rowset(base_tablet->tablet_schema(), 
base_tablet_id, 2, 6);
+    auto base_latest_rowset = create_rowset(base_tablet->tablet_schema(), 
base_tablet_id, 7, 10);
+
+    auto* sp = SyncPoint::get_instance();
+    sp->clear_all_call_backs();
+    sp->enable_processing();
+
+    sp->set_call_back("CloudMetaMgr::get_tablet_meta", [&](auto&& args) {
+        auto tablet_id = try_any_cast<int64_t>(args[0]);
+        auto* meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
+        if (tablet_id == base_tablet_id) {
+            *meta_ptr = base_tablet->tablet_meta();
+        } else if (tablet_id == new_tablet_id) {
+            *meta_ptr = new_tablet->tablet_meta();
+        }
+        try_any_cast_ret<Status>(args)->second = true;
+    });
+
+    int base_sync_count = 0;
+    sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [&](auto&& outcome) 
{
+        auto* tablet = try_any_cast<CloudTablet*>(outcome[0]);
+        if (tablet->tablet_id() == base_tablet_id) {
+            ++base_sync_count;
+            std::unique_lock lock(tablet->get_header_lock());
+            if (base_sync_count == 1) {
+                tablet->add_rowsets({base_initial_rowset}, false, lock);
+            } else {
+                tablet->add_rowsets({base_latest_rowset}, false, lock);
+            }
+        }
+        auto* pairs = try_any_cast_ret<Status>(outcome);
+        pairs->second = true;
+        pairs->first = Status::OK();
+    });
+
+    int64_t prepared_alter_version = -1;
+    sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [&](auto&& outcome) {
+        auto job = try_any_cast<cloud::TabletJobInfoPB>(outcome[0]);
+        ASSERT_TRUE(job.has_schema_change());
+        prepared_alter_version = job.schema_change().alter_version();
+
+        auto* pairs = try_any_cast_ret<Status>(outcome);
+        pairs->second = true;
+        pairs->first = Status::InternalError("mock job already success");
+
+        auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
+        resp->mutable_status()->set_code(cloud::JOB_ALREADY_SUCCESS);
+    });
+
+    TAlterTabletReqV2 request;
+    request.base_tablet_id = base_tablet_id;
+    request.new_tablet_id = new_tablet_id;
+    request.alter_version = 4;
+    request.__set_alter_tablet_type(TAlterTabletType::SCHEMA_CHANGE);
+
+    CloudSchemaChangeJob sc_job(_engine, "test_refresh_base_tablet_before_sc", 
9999999999);
+    auto status = sc_job.process_alter_tablet(request);
+
+    ASSERT_TRUE(status.ok()) << status.to_string();
+    ASSERT_EQ(base_sync_count, 2);
+    ASSERT_EQ(prepared_alter_version, 10);
+}
+
 // Test: cross-V1 compaction detected → abort SC job → return 
SC_COMPACTION_CONFLICT
 TEST_F(CloudSchemaChangeJobTest, CrossV1CompactionDetected) {
     int64_t base_tablet_id = 10001;
diff --git 
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
new file mode 100644
index 00000000000..679385562b5
--- /dev/null
+++ 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: schema change retry refreshes base tablet rowsets before registering 
a fresh V1.
+//
+// Timeline:
+//   1. Block SC, insert data, and let new tablet compaction create a cross-V1 
rowset.
+//   2. Force V1=6 once to prove the cross-V1 retry path is active.
+//   3. Disable the V1 override, but inject stale local base max=6 only for 
query_version sync.
+//   4. Retry must refresh the base tablet and finish with the latest V1.
+//
+// The stale-local-max debug point only fires when the caller uses 
SyncOptions.query_version.
+// Old code capped SC base sync by request.alter_version, so it keeps V1 stale 
and cannot finish.
+// The fixed path does not set query_version, so it refreshes from 
meta-service and succeeds.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_cross_v1_stale_base_refresh', 'docker') {
+
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.enableDebugPoints()
+    options.beConfigs += ["enable_java_support=false"]
+    options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+    options.beConfigs += ["alter_tablet_worker_count=1"]
+    options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+    options.beNum = 1
+    options.feConfigs += ["http_port=8030"]
+    options.feConfigs += ["rpc_port=9020"]
+    options.feConfigs += ["query_port=9030"]
+    options.feConfigs += ["edit_log_port=9010"]
+    options.feConfigs += ["enable_schema_change_retry=true"]
+    options.feConfigs += ["schema_change_max_retry_time=10"]
+
+    docker(options) {
+        def getJobState = {
+            def result = sql """
+                SHOW ALTER TABLE COLUMN
+                WHERE IndexName='sc_cross_v1_stale_base_refresh_test'
+                ORDER BY createtime DESC LIMIT 1
+            """
+            logger.info("getJobState: ${result}")
+            return result[0][9]
+        }
+
+        sql "DROP TABLE IF EXISTS sc_cross_v1_stale_base_refresh_test"
+        sql """
+            CREATE TABLE sc_cross_v1_stale_base_refresh_test (
+                k1 int NOT NULL,
+                v1 varchar(100) NOT NULL,
+                v2 int NOT NULL
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1"
+            )
+        """
+
+        def tablets = sql_return_maparray "SHOW TABLETS FROM 
sc_cross_v1_stale_base_refresh_test"
+        assertEquals(1, tablets.size())
+        def baseTabletId = tablets[0].TabletId.toString()
+        logger.info("base tablet id for stale refresh test: ${baseTabletId}")
+
+        for (int i = 0; i < 3; i++) {
+            StringBuilder sb = new StringBuilder()
+            sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test VALUES 
")
+            for (int j = 0; j < 20; j++) {
+                if (j > 0) {
+                    sb.append(", ")
+                }
+                def key = i * 20 + j + 1
+                sb.append("(${key}, 'val_${key}', ${key * 10})")
+            }
+            sql sb.toString()
+        }
+        assertEquals(60L, (sql "SELECT count(*) FROM 
sc_cross_v1_stale_base_refresh_test")[0][0])
+
+        def scBlock = 'CloudSchemaChangeJob::process_alter_tablet.block'
+        def overrideDP = 
'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version'
+        def staleMaxDP = 
'CloudTablet::sync_rowsets.stale_local_max_for_query_version'
+
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs(scBlock)
+            try {
+                sql "ALTER TABLE sc_cross_v1_stale_base_refresh_test MODIFY 
COLUMN v2 bigint"
+                sleep(10000)
+                assertEquals("RUNNING", getJobState())
+
+                for (int i = 0; i < 6; i++) {
+                    StringBuilder sb = new StringBuilder()
+                    sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test 
VALUES ")
+                    for (int j = 0; j < 10; j++) {
+                        if (j > 0) {
+                            sb.append(", ")
+                        }
+                        def key = 100 + i * 10 + j + 1
+                        sb.append("(${key}, 'new_${key}', ${key * 10})")
+                    }
+                    sql sb.toString()
+                }
+
+                sleep(30000)
+                GetDebugPoint().enableDebugPointForAllBEs(overrideDP, 
[version: 6])
+            } finally {
+                GetDebugPoint().disableDebugPointForAllBEs(scBlock)
+            }
+
+            sleep(10000)
+            assertEquals("RUNNING", getJobState(),
+                    "SC should still be RUNNING while V1 is forced to cross 
the compacted rowset")
+
+            GetDebugPoint().enableDebugPointForAllBEs(
+                    staleMaxDP, [tablet_id: baseTabletId, version: 6])
+            GetDebugPoint().disableDebugPointForAllBEs(overrideDP)
+
+            int maxTries = 180
+            def finalState = ""
+            while (maxTries-- > 0) {
+                finalState = getJobState()
+                if (finalState == "FINISHED" || finalState == "CANCELLED") {
+                    break
+                }
+                sleep(1000)
+            }
+
+            logger.info("SC final state after stale base refresh retry: 
${finalState}")
+            assertEquals("FINISHED", finalState)
+
+            assertEquals(120L,
+                    (sql "SELECT count(*) FROM 
sc_cross_v1_stale_base_refresh_test")[0][0])
+
+            def columns = sql "DESC sc_cross_v1_stale_base_refresh_test"
+            def v2Col = columns.find { it[0] == "v2" }
+            assertTrue(v2Col[1].toString().toLowerCase().contains("bigint"),
+                    "v2 column should be bigint after schema change, got: 
${v2Col[1]}")
+
+            def backends = sql_return_maparray("SHOW BACKENDS")
+            assertTrue(backends.every { it.Alive.toString() == "true" },
+                    "BE should be alive after stale base refresh retry")
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to