This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 01df440208f [fix](cache) Follower FE sql cache not invalidated on 
table metadata replay (#63612) (#63658)
01df440208f is described below

commit 01df440208ffb8012a749c3b26b9e52ad0fe7c39
Author: 924060929 <[email protected]>
AuthorDate: Tue May 26 22:05:53 2026 +0800

    [fix](cache) Follower FE sql cache not invalidated on table metadata replay 
(#63612) (#63658)
    
    ## Proposed changes
    
    Cherry-pick of #63612.
    
    Follower FEs replay metadata changes (e.g. `ALTER TABLE ... RENAME
    COLUMN`) but
    never invalidated the sql cache, so stale entries survived and queries
    returned
    wrong results. Since `enable_sql_cache` defaults to `true` since 4.0,
    every
    multi-FE deployment is affected.
    
    ### Two-layer fix
    
    | Layer | Mechanism | Scope | Crash window |
    |-------|-----------|-------|-------------|
    | 1 | **baseSchemaVersion check (new)** | OlapTable schema DDL
    (rename/add/drop column) | **None** |
    | 2 | **OP_TABLE_META_CHANGE journal (new)** | All table types, all DDL
    | Tiny |
    
    **Layer 1** stores `OlapTable.getBaseSchemaVersion()` in cache entries
    and checks
    on lookup — lazy validation, zero crash window, automatically covers all
    DDLs
    that bump schemaVersion.
    
    **Layer 2** broadcasts `OP_TABLE_META_CHANGE` via journal on DDL, gated
    by
    `enable_write_op_table_meta_change` FE config (default **false**):
    - `false`: master does local cache invalidation only, no journal write
    - `true`: broadcasts to all followers
    
    Layer 1 works regardless of the config setting.
---
 .../main/java/org/apache/doris/common/Config.java  |  12 ++
 .../main/java/org/apache/doris/alter/Alter.java    |   4 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  42 +++++-
 .../doris/common/cache/NereidsSqlCacheManager.java |  28 ++--
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../org/apache/doris/nereids/SqlCacheContext.java  |   8 +-
 .../java/org/apache/doris/persist/EditLog.java     |   9 ++
 .../org/apache/doris/persist/OperationType.java    |   5 +
 .../org/apache/doris/persist/TableMetaChange.java  | 153 +++++++++++++++++++++
 9 files changed, 249 insertions(+), 18 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0f70365b66a..35c5cadd824 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2452,6 +2452,18 @@ public class Config extends ConfigBase {
     )
     public static int sql_cache_manage_num = 100;
 
+    @ConfField(
+            mutable = true,
+            description = {
+                    "是否在 DDL 时写入 OP_TABLE_META_CHANGE edit log 通知 follower FE 
清理 sql cache。"
+                            + "默认 false,开启后 master DDL 会广播表元数据变更信号到所有 
follower",
+                    "Whether to write OP_TABLE_META_CHANGE edit log on DDL to 
notify follower FEs "
+                            + "to invalidate sql cache. Default false. When 
enabled, master DDL broadcasts "
+                            + "table metadata change signal to all followers"
+            }
+    )
+    public static boolean enable_write_op_table_meta_change = false;
+
     @ConfField(
             mutable = true,
             callbackClassString = 
"org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 89c2d95f4ae..0987d4b70eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -72,7 +72,6 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.cache.NereidsSqlCacheManager;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.PropertyAnalyzer;
@@ -363,8 +362,7 @@ public class Alter {
 
         olapTable.writeLock();
         try {
-            NereidsSqlCacheManager sqlCacheManager = 
Env.getCurrentEnv().getSqlCacheManager();
-            sqlCacheManager.invalidateAboutTable(olapTable);
+            Env.getCurrentEnv().notifyTableMetaChange(olapTable);
         } finally {
             olapTable.writeUnlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 67a1d656051..9dfcbfc1fa5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -222,6 +222,7 @@ import org.apache.doris.persist.SetTableStatusOperationLog;
 import org.apache.doris.persist.Storage;
 import org.apache.doris.persist.StorageInfo;
 import org.apache.doris.persist.TableInfo;
+import org.apache.doris.persist.TableMetaChange;
 import org.apache.doris.persist.TablePropertyInfo;
 import org.apache.doris.persist.TableRenameColumnInfo;
 import org.apache.doris.persist.TruncateTableInfo;
@@ -7001,7 +7002,9 @@ public class Env {
                 LOG.warn("ignore set same state {} for table {}. is replay: 
{}.",
                             olapTable.getState(), tableName, isReplay);
             }
-            
Env.getCurrentEnv().getSqlCacheManager().invalidateAboutTable(olapTable);
+            if (!isReplay) {
+                notifyTableMetaChange(olapTable);
+            }
         } finally {
             olapTable.writeUnlock();
         }
@@ -7109,7 +7112,9 @@ public class Env {
                 LOG.info("set replica {} of tablet {} on backend {} as version 
{}, last success version {}, "
                         + "last failed version {}, update time {}. is replay: 
{}", replica.getId(), tabletId,
                         backendId, version, lastSuccessVersion, 
lastFailedVersion, updateTime, isReplay);
-                
Env.getCurrentEnv().getSqlCacheManager().invalidateAboutTable(table);
+                if (!isReplay) {
+                    notifyTableMetaChange(table);
+                }
             } finally {
                 table.writeUnlock();
             }
@@ -7190,7 +7195,9 @@ public class Env {
                         + " {}.", partitionId, oldVersion, visibleVersion, 
database, table, isReplay);
             }
 
-            
Env.getCurrentEnv().getSqlCacheManager().invalidateAboutTable(olapTable);
+            if (!isReplay) {
+                notifyTableMetaChange(olapTable);
+            }
         } finally {
             olapTable.writeUnlock();
         }
@@ -7435,6 +7442,35 @@ public class Env {
         return sortedPartitionsCacheManager;
     }
 
+    public void notifyTableMetaChange(TableIf table) {
+        if (table == null) {
+            return;
+        }
+        TableMetaChange change =
+                TableMetaChange.fromTable(table);
+        fanOutTableMetaChange(change);
+        if (isMaster() && editLog != null && 
Config.enable_write_op_table_meta_change) {
+            editLog.logTableMetaChange(change);
+        }
+    }
+
+    public void replayTableMetaChange(TableMetaChange change) {
+        if (change == null) {
+            return;
+        }
+        fanOutTableMetaChange(change);
+    }
+
+    private void fanOutTableMetaChange(TableMetaChange change) {
+        if (sqlCacheManager != null) {
+            sqlCacheManager.invalidateAboutTable(change);
+        }
+        if (sortedPartitionsCacheManager != null) {
+            sortedPartitionsCacheManager.invalidateTable(
+                    change.getCatalogName(), change.getDbName(), 
change.getTableName());
+        }
+    }
+
     public SplitSourceManager getSplitSourceManager() {
         return splitSourceManager;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java
index fe8f01b7254..af44c008b2e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java
@@ -59,6 +59,7 @@ import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
 import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.persist.TableMetaChange;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types.PUniqueId;
 import org.apache.doris.qe.ConnectContext;
@@ -112,29 +113,31 @@ public class NereidsSqlCacheManager {
     }
 
     public void invalidateAboutTable(TableIf tableIf) {
-        Set<String> invalidateKeys = new LinkedHashSet<>();
+        invalidateAboutTable(TableMetaChange.fromTable(tableIf));
+    }
+
+    public void invalidateAboutTable(TableMetaChange event) {
         FullTableName invalidateTableName = null;
-        DatabaseIf database = tableIf.getDatabase();
-        if (database != null) {
-            CatalogIf catalog = database.getCatalog();
-            if (catalog != null) {
-                invalidateTableName = new FullTableName(
-                        database.getCatalog().getName(), 
database.getFullName(), tableIf.getName()
-                );
-            }
+        if (event.getCatalogName() != null && event.getDbName() != null && 
event.getTableName() != null) {
+            invalidateTableName = new FullTableName(
+                    event.getCatalogName(), event.getDbName(), 
event.getTableName());
         }
 
+        Set<String> invalidateKeys = new LinkedHashSet<>();
         for (Entry<String, SqlCacheContext> kv : sqlCaches.asMap().entrySet()) 
{
             String key = kv.getKey();
             SqlCacheContext context = kv.getValue();
+            if (context == null) {
+                continue;
+            }
             for (Entry<FullTableName, TableVersion> nameToVersion : 
context.getUsedTables().entrySet()) {
                 FullTableName tableName = nameToVersion.getKey();
                 TableVersion tableVersion = nameToVersion.getValue();
-                if (tableVersion.id == tableIf.getId()) {
+                if (tableVersion.id == event.getTableId()) {
                     invalidateKeys.add(key);
                     break;
                 }
-                if (tableName.equals(invalidateTableName)) {
+                if (invalidateTableName != null && 
tableName.equals(invalidateTableName)) {
                     invalidateKeys.add(key);
                     break;
                 }
@@ -465,6 +468,9 @@ public class NereidsSqlCacheManager {
                 if (currentTableVersion != cacheTableVersion) {
                     return IsChanged.CHANGED_AND_INVALIDATE_CACHE;
                 }
+                if (olapTable.getBaseSchemaVersion() != 
tableVersion.schemaVersion) {
+                    return IsChanged.CHANGED_AND_INVALIDATE_CACHE;
+                }
                 if (tableIf instanceof MTMV) {
                     // mtmv maybe access old data when grace_period > 0, we 
should disable cache at this case
                     long gracePeriod = ((MTMV) tableIf).getGracePeriod();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 7c7a4d4a6ae..954316c683b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -127,6 +127,7 @@ import org.apache.doris.persist.TableAddOrDropColumnsInfo;
 import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
 import org.apache.doris.persist.TableBranchOrTagInfo;
 import org.apache.doris.persist.TableInfo;
+import org.apache.doris.persist.TableMetaChange;
 import org.apache.doris.persist.TablePropertyInfo;
 import org.apache.doris.persist.TableRenameColumnInfo;
 import org.apache.doris.persist.TableStatsDeletionLog;
@@ -1011,6 +1012,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_TABLE_META_CHANGE: {
+                data = TableMetaChange.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
index e532ce611fe..8828c5b711b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
@@ -203,12 +203,17 @@ public class SqlCacheContext {
             LOG.warn("table {}, can not get version", tableIf.getName(), e);
         }
 
+        int schemaVersion = 0;
+        if (tableIf instanceof OlapTable) {
+            schemaVersion = ((OlapTable) tableIf).getBaseSchemaVersion();
+        }
         usedTables.put(
                 new FullTableName(database.getCatalog().getName(), 
database.getFullName(), tableIf.getName()),
                 new TableVersion(
                         tableIf.getId(),
                         version,
-                        tableIf.getType()
+                        tableIf.getType(),
+                        schemaVersion
                 )
         );
     }
@@ -593,6 +598,7 @@ public class SqlCacheContext {
         public final long id;
         public final long version;
         public final TableType type;
+        public final int schemaVersion;
     }
 
     /** CacheKeyType */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 4c42d5ccc06..638286d1543 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1441,6 +1441,11 @@ public class EditLog {
                     // This log is only used to keep FE/MS cut point in 
journal timeline.
                     break;
                 }
+                case OperationType.OP_TABLE_META_CHANGE: {
+                    TableMetaChange op = (TableMetaChange) journal.getData();
+                    env.replayTableMetaChange(op);
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}, log id: {}", opCode, 
logId, e);
@@ -2543,4 +2548,8 @@ public class EditLog {
     public long logMetaSyncPoint(CloudMetaSyncPoint syncPoint) {
         return logEdit(OperationType.OP_META_SYNC_POINT, syncPoint);
     }
+
+    public void logTableMetaChange(TableMetaChange op) {
+        logEdit(OperationType.OP_TABLE_META_CHANGE, op);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 23f1102be11..3178bd0bed8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -429,6 +429,11 @@ public class OperationType {
     public static final short OP_BEGIN_SNAPSHOT = 1100;
     public static final short OP_META_SYNC_POINT = 1101;
 
+    // Generic "an operation modified this table's metadata" signal broadcast 
from
+    // master to followers so that every FE-local cache keyed by table can be
+    // invalidated (NereidsSqlCacheManager, 
NereidsSortedPartitionsCacheManager, …).
+    public static final short OP_TABLE_META_CHANGE = 1102;
+
     /**
      * Get opcode name by op code.
      **/
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/TableMetaChange.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/TableMetaChange.java
new file mode 100644
index 00000000000..cc62417d92f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TableMetaChange.java
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Persist payload for {@link OperationType#OP_TABLE_META_OPERATION}.
+ * Generic "an operation modified this table's metadata" signal that follower
+ * FEs use to invalidate any local FE caches keyed by the table (sql cache,
+ * sorted partition cache, and future per-table caches). This is about
+ * metadata mutations (schema/properties/partitions/etc.), not data writes.
+ * Carries both ids and names of catalog / database / table so each subscriber
+ * can match by id (preferred) or by full name (fallback, e.g. when the table
+ * has been concurrently dropped/recreated and the id no longer matches but
+ * the name still does). Also carries the master-side timestamp so subscribers
+ * and audit tooling can correlate the event with the originating DDL.
+ */
+public class TableMetaChange implements Writable {
+    @SerializedName("ci")
+    private long catalogId;
+    @SerializedName("cn")
+    private String catalogName;
+    @SerializedName("di")
+    private long dbId;
+    @SerializedName("dn")
+    private String dbName;
+    @SerializedName("ti")
+    private long tableId;
+    @SerializedName("tn")
+    private String tableName;
+    // master-side millis-since-epoch when this event was emitted
+    @SerializedName("ts")
+    private long eventTimeMs;
+
+    public TableMetaChange() {
+        // for persist
+    }
+
+    /** Build a TableMetaChange from a TableIf (master-side helper). */
+    public static TableMetaChange fromTable(TableIf table) {
+        long catalogId = -1L;
+        String catalogName = "";
+        long dbId = -1L;
+        String dbName = "";
+        DatabaseIf<?> db = table.getDatabase();
+        if (db != null) {
+            dbId = db.getId();
+            dbName = db.getFullName();
+            CatalogIf<?> catalog = db.getCatalog();
+            if (catalog != null) {
+                catalogId = catalog.getId();
+                catalogName = catalog.getName();
+            }
+        }
+        return new TableMetaChange(catalogId, catalogName, dbId, dbName,
+                table.getId(), table.getName());
+    }
+
+    public TableMetaChange(long catalogId, String catalogName,
+                              long dbId, String dbName,
+                              long tableId, String tableName) {
+        this(catalogId, catalogName, dbId, dbName, tableId, tableName, 
System.currentTimeMillis());
+    }
+
+    public TableMetaChange(long catalogId, String catalogName,
+                              long dbId, String dbName,
+                              long tableId, String tableName,
+                              long eventTimeMs) {
+        this.catalogId = catalogId;
+        this.catalogName = catalogName;
+        this.dbId = dbId;
+        this.dbName = dbName;
+        this.tableId = tableId;
+        this.tableName = tableName;
+        this.eventTimeMs = eventTimeMs;
+    }
+
+    public long getCatalogId() {
+        return catalogId;
+    }
+
+    public String getCatalogName() {
+        return catalogName;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public long getEventTimeMs() {
+        return eventTimeMs;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static TableMetaChange read(DataInput in) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(in), 
TableMetaChange.class);
+    }
+
+    @Override
+    public String toString() {
+        return "TableMetaChange{catalogId=" + catalogId
+                + ", catalogName='" + catalogName + '\''
+                + ", dbId=" + dbId
+                + ", dbName='" + dbName + '\''
+                + ", tableId=" + tableId
+                + ", tableName='" + tableName + '\''
+                + ", eventTimeMs=" + eventTimeMs
+                + '}';
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to