This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 03c8cae0dbc branch-4.0: (cloud) Hold table write lock across 
first-time dynamic partition setup to prevent CREATE MV race #62755 (#62862)
03c8cae0dbc is described below

commit 03c8cae0dbc336cc2ae4881602a85b68fe01a818
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 7 17:13:25 2026 +0800

    branch-4.0: (cloud) Hold table write lock across first-time dynamic 
partition setup to prevent CREATE MV race #62755 (#62862)
    
    Cherry-picked from #62755
    
    Co-authored-by: deardeng <[email protected]>
---
 .../apache/doris/datasource/InternalCatalog.java   | 104 +++++++++------
 .../test_create_table_and_create_mv_race.groovy    | 143 +++++++++++++++++++++
 2 files changed, 208 insertions(+), 39 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 02f9ab3c672..82ac3d992d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3188,6 +3188,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             }
 
             Pair<Boolean, Boolean> result;
+            boolean holdTableLock = false;
             db.writeLockOrDdlException();
             try {
                 // db name not changed
@@ -3196,51 +3197,76 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 }
                 // register table, write create table edit log
                 result = db.createTableWithoutLock(olapTable, false, 
createTableInfo.isIfNotExists());
+                if (!result.second) {
+                    olapTable.writeLock();
+                    holdTableLock = true;
+                }
             } finally {
                 db.writeUnlock();
             }
-            if (!result.first) {
-                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
-            }
+            try {
+                if (!result.first) {
+                    
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
+                }
 
-            if (result.second) { // table already exists
-                if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
-                    // if this is a colocate table, its table id is already 
added to colocate group
-                    // so we should remove the tableId here
-                    Env.getCurrentColocateIndex().removeTable(tableId);
+                if (result.second) { // table already exists
+                    if 
(Env.getCurrentColocateIndex().isColocateTable(tableId)) {
+                        // if this is a colocate table, its table id is 
already added to colocate group
+                        // so we should remove the tableId here
+                        Env.getCurrentColocateIndex().removeTable(tableId);
+                    }
+                    for (Long tabletId : tabletIdSet) {
+                        Env.getCurrentInvertedIndex().deleteTablet(tabletId);
+                    }
+                    LOG.info("duplicate create table[{};{}] in db[{};{}], skip 
next steps",
+                            tableName, tableId, db.getName(), db.getId());
+                } else {
+                    // if table not exists, then db.createTableWithLock will 
write an editlog.
+                    hadLogEditCreateTable = true;
+
+                    // we have added these index to memory, only need to 
persist here
+                    if 
(Env.getCurrentColocateIndex().isColocateTable(tableId)) {
+                        GroupId groupId = 
Env.getCurrentColocateIndex().getGroup(tableId);
+                        Map<Tag, List<List<Long>>> backendsPerBucketSeq = 
Env.getCurrentColocateIndex()
+                                .getBackendsPerBucketSeq(groupId);
+                        ColocatePersistInfo info = 
ColocatePersistInfo.createForAddTable(groupId, tableId,
+                                backendsPerBucketSeq);
+                        
Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
+                    }
+                    LOG.info("successfully create table[{};{}] in db[{};{}]",
+                            tableName, tableId, db.getName(), db.getId());
+
+                    if 
(DebugPointUtil.isEnable("FE.createOlapTable.beforeFirstTimeDynamicPartition")) 
{
+                        long sleepMs = DebugPointUtil.getDebugParamOrDefault(
+                                
"FE.createOlapTable.beforeFirstTimeDynamicPartition", "sleepMs", 0L);
+                        if (sleepMs > 0) {
+                            LOG.info("debug point 
FE.createOlapTable.beforeFirstTimeDynamicPartition, sleep {}ms",
+                                    sleepMs);
+                            try {
+                                Thread.sleep(sleepMs);
+                            } catch (InterruptedException ignore) {
+                                Thread.currentThread().interrupt();
+                            }
+                        }
+                    }
+
+                    Env.getCurrentEnv().getDynamicPartitionScheduler()
+                            .executeDynamicPartitionFirstTime(db.getId(), 
olapTable.getId());
+                    // register or remove table from DynamicPartition after 
table created
+                    
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), 
olapTable, false);
+                    Env.getCurrentEnv().getDynamicPartitionScheduler()
+                            .createOrUpdateRuntimeInfo(tableId, 
DynamicPartitionScheduler.LAST_UPDATE_TIME,
+                                    TimeUtils.getCurrentFormatTime());
                 }
-                for (Long tabletId : tabletIdSet) {
-                    Env.getCurrentInvertedIndex().deleteTablet(tabletId);
+
+                if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
+                    LOG.info("debug point FE.createOlapTable.exception, throw 
e");
+                    throw new DdlException("debug point 
FE.createOlapTable.exception");
+                }
+            } finally {
+                if (holdTableLock) {
+                    olapTable.writeUnlock();
                 }
-                LOG.info("duplicate create table[{};{}] in db[{};{}], skip 
next steps",
-                        tableName, tableId, db.getName(), db.getId());
-            } else {
-                // if table not exists, then db.createTableWithLock will write 
an editlog.
-                hadLogEditCreateTable = true;
-
-                // we have added these index to memory, only need to persist 
here
-                if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
-                    GroupId groupId = 
Env.getCurrentColocateIndex().getGroup(tableId);
-                    Map<Tag, List<List<Long>>> backendsPerBucketSeq = 
Env.getCurrentColocateIndex()
-                            .getBackendsPerBucketSeq(groupId);
-                    ColocatePersistInfo info = 
ColocatePersistInfo.createForAddTable(groupId, tableId,
-                            backendsPerBucketSeq);
-                    Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
-                }
-                LOG.info("successfully create table[{};{}] in db[{};{}]",
-                        tableName, tableId, db.getName(), db.getId());
-                Env.getCurrentEnv().getDynamicPartitionScheduler()
-                    .executeDynamicPartitionFirstTime(db.getId(), 
olapTable.getId());
-                // register or remove table from DynamicPartition after table 
created
-                
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), 
olapTable, false);
-                Env.getCurrentEnv().getDynamicPartitionScheduler()
-                        .createOrUpdateRuntimeInfo(tableId, 
DynamicPartitionScheduler.LAST_UPDATE_TIME,
-                        TimeUtils.getCurrentFormatTime());
-            }
-
-            if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
-                LOG.info("debug point FE.createOlapTable.exception, throw e");
-                throw new DdlException("debug point 
FE.createOlapTable.exception");
             }
         } catch (DdlException e) {
             LOG.warn("create table failed {} - {}", tabletIdSet, 
e.getMessage());
diff --git 
a/regression-test/suites/cloud_p0/partition/test_create_table_and_create_mv_race.groovy
 
b/regression-test/suites/cloud_p0/partition/test_create_table_and_create_mv_race.groovy
new file mode 100644
index 00000000000..47c71f0c1c2
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/partition/test_create_table_and_create_mv_race.groovy
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+// Regression test for the cloud-specific race where a concurrent
+// CREATE MATERIALIZED VIEW slips in after OP_CREATE_TABLE is journaled
+// but before first-time dynamic partition entries are journaled, causing
+// the rollup job to reference partitions that never appear in the journal.
+// Replay later NPEs at RollupJobV2.addTabletToInvertedIndex.
+//
+// The fix holds olapTable.writeLock across the whole first-time dynamic
+// partition setup so CREATE MV must wait until the table is fully built.
+suite("test_create_table_and_create_mv_race", 'p0, docker') {
+    if (!isCloudMode()) {
+        return
+    }
+    def options = new ClusterOptions()
+    options.enableDebugPoints()
+    options.setFeNum(3)
+    options.feConfigs.add('sys_log_verbose_modules=org')
+    options.setBeNum(1)
+    options.cloudMode = true
+
+    docker(options) {
+        sql """set enable_sql_cache=false"""
+
+        def tbl = 'test_create_table_and_create_mv_race_tbl'
+        def mvName = 'test_create_table_and_create_mv_race_mv'
+
+        sql "DROP TABLE IF EXISTS ${tbl}"
+
+        // Widen the race window: slow down first-time dynamic partition
+        // so the concurrent CREATE MV has time to arrive.
+        // With the fix, CREATE MV will block on olapTable.writeLock() and
+        // only run after the table is fully built.
+        //
+        // params serialize to HTTP query strings on the wire, so values
+        // must be String-convertible (Groovy handles the coercion).
+        cluster.injectDebugPoints(NodeType.FE, [
+                'FE.createOlapTable.beforeFirstTimeDynamicPartition': 
[sleepMs: '10000']
+        ])
+
+        try {
+            def createDoneAt = new java.util.concurrent.atomic.AtomicLong(0L)
+            def mvDoneAt = new java.util.concurrent.atomic.AtomicLong(0L)
+
+            def createFuture = thread('create-table') {
+                sql """
+                    CREATE TABLE ${tbl} (
+                        user_id     BIGINT,
+                        create_dt   datetime,
+                        amount      BIGINT
+                    )
+                    DUPLICATE KEY(user_id, create_dt)
+                    PARTITION BY RANGE(create_dt) ()
+                    DISTRIBUTED BY HASH(user_id) BUCKETS 1
+                    PROPERTIES (
+                        "replication_num" = "1",
+                        "dynamic_partition.enable" = "true",
+                        "dynamic_partition.time_unit" = "DAY",
+                        "dynamic_partition.start" = "-2",
+                        "dynamic_partition.end" = "2",
+                        "dynamic_partition.prefix" = "p",
+                        "dynamic_partition.create_history_partition" = "true"
+                    )
+                """
+                createDoneAt.set(System.currentTimeMillis())
+            }
+
+            // Give CREATE TABLE time to reach the injected sleep.
+            sleep(2000)
+
+            def mvFuture = thread('create-mv') {
+                // If CREATE MV fires before the table exists in the 
namespace, retry a few times.
+                def attempts = 30
+                while (attempts-- > 0) {
+                    try {
+                        sql """
+                            CREATE MATERIALIZED VIEW ${mvName} AS
+                            SELECT user_id AS mv_user_id, sum(amount) AS 
mv_sum_amount
+                            FROM ${tbl}
+                            GROUP BY user_id
+                        """
+                        break
+                    } catch (Exception e) {
+                        def msg = e.getMessage() ?: ''
+                        if (msg.contains("Unknown table") || 
msg.contains("does not exist")) {
+                            sleep(200)
+                            continue
+                        }
+                        throw e
+                    }
+                }
+                mvDoneAt.set(System.currentTimeMillis())
+            }
+
+            createFuture.get()
+            mvFuture.get()
+
+            // Correctness invariant: CREATE MV must have finished AFTER 
CREATE TABLE.
+            // If the lock is missing, CREATE MV would return first (it's 
cheap) while
+            // CREATE TABLE is still inside the injected 10s sleep.
+            def ct = createDoneAt.get()
+            def mt = mvDoneAt.get()
+            assert ct > 0 && mt > 0, "both futures should have completed"
+            assert mt >= ct, "CREATE MV (${mt}) must finish after CREATE TABLE 
(${ct})," +
+                    " otherwise the lock fix is missing and MV raced ahead of 
first-time dynamic partition setup"
+        } finally {
+            cluster.clearFrontendDebugPoints()
+        }
+
+        cluster.checkFeIsAlive(1, true)
+
+        // Verify the table has all dynamic partitions.
+        // dynamic_partition.start=-2, end=2, create_history_partition=true → 
5 partitions expected
+        def partitions = sql "SHOW PARTITIONS FROM ${tbl}"
+        assert partitions.size() >= 3,
+                "dynamic_partition.start=-2/end=2 should have produced at 
least 3 partitions, got ${partitions.size()}"
+
+        // Sanity: the MV exists and an end-to-end insert works.
+        def now = new Date()
+        def dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
+        def today = dateFormat.format(now)
+        sql "INSERT INTO ${tbl} VALUES (1, '${today} 12:00:00', 100)"
+        def cnt = sql "SELECT count(*) FROM ${tbl}"
+        assert cnt[0][0] == 1L, "expected 1 row, got ${cnt[0][0]}"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to