This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 25f291673f1 [fix](fe) Reject Iceberg v3 row lineage columns (#63825)
25f291673f1 is described below

commit 25f291673f1e8e78e42677a7c6494b18afa1620a
Author: Gabriel <[email protected]>
AuthorDate: Mon Jun 29 10:33:26 2026 +0800

    [fix](fe) Reject Iceberg v3 row lineage columns (#63825)
    
    Problem Summary: Creating an Iceberg table with format-version 3 could
    accept user columns named _row_id or _last_updated_sequence_number.
    Doris later appends Iceberg row lineage hidden columns with the same
    names, which can produce duplicate hidden-column metadata and confusing
    insert/query behavior. This change validates Iceberg create table
    definitions, including CTAS, and rejects those reserved row lineage
    column names when the requested Iceberg format version is 3 or newer.
---
 .../datasource/iceberg/IcebergMetadataOps.java     |  8 ++-
 .../doris/datasource/iceberg/IcebergUtils.java     | 27 ++++++++
 .../trees/plans/commands/info/CreateTableInfo.java | 36 +++++++++++
 .../iceberg/IcebergDDLAndDMLPlanTest.java          | 71 ++++++++++++++++++++++
 .../datasource/iceberg/IcebergMetadataOpTest.java  | 50 ++++++++++++++-
 5 files changed, 190 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index bb2d1debd8b..246ad347242 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -373,10 +373,16 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         Schema schema = new 
Schema(visit.asNestedType().asStructType().fields());
         Map<String, String> properties = createTableInfo.getProperties();
         properties.put(ExternalCatalog.DORIS_VERSION, 
ExternalCatalog.DORIS_VERSION_VALUE);
-        properties.putIfAbsent(TableProperties.FORMAT_VERSION, "2");
+        Map<String, String> catalogProperties = dorisCatalog.getProperties();
+        if (!properties.containsKey(TableProperties.FORMAT_VERSION)
+                && 
!IcebergUtils.hasIcebergCatalogFormatVersion(catalogProperties)) {
+            properties.put(TableProperties.FORMAT_VERSION, "2");
+        }
         properties.putIfAbsent(TableProperties.DELETE_MODE, 
RowLevelOperationMode.MERGE_ON_READ.modeName());
         properties.putIfAbsent(TableProperties.UPDATE_MODE, 
RowLevelOperationMode.MERGE_ON_READ.modeName());
         properties.putIfAbsent(TableProperties.MERGE_MODE, 
RowLevelOperationMode.MERGE_ON_READ.modeName());
+        createTableInfo.validateIcebergRowLineageColumns(
+                IcebergUtils.getEffectiveIcebergFormatVersion(properties, 
catalogProperties));
         PartitionSpec partitionSpec = 
IcebergUtils.solveIcebergPartitionSpec(createTableInfo.getPartitionDesc(),
                 schema);
         // Build and create table with optional sort order
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index ea36d755cfd..92c6171b669 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -189,6 +189,33 @@ public class IcebergUtils {
 
     private static final Pattern SNAPSHOT_ID = Pattern.compile("\\d+");
 
+    public static boolean hasIcebergCatalogFormatVersion(Map<String, String> 
catalogProperties) {
+        return 
catalogProperties.containsKey(CatalogProperties.TABLE_OVERRIDE_PREFIX + 
TableProperties.FORMAT_VERSION)
+                || 
catalogProperties.containsKey(CatalogProperties.TABLE_DEFAULT_PREFIX
+                        + TableProperties.FORMAT_VERSION);
+    }
+
+    public static int getEffectiveIcebergFormatVersion(Map<String, String> 
tableProperties,
+            Map<String, String> catalogProperties) {
+        String formatVersion = 
catalogProperties.get(CatalogProperties.TABLE_OVERRIDE_PREFIX
+                + TableProperties.FORMAT_VERSION);
+        if (formatVersion == null) {
+            formatVersion = 
tableProperties.get(TableProperties.FORMAT_VERSION);
+            if (formatVersion == null) {
+                formatVersion = 
catalogProperties.get(CatalogProperties.TABLE_DEFAULT_PREFIX
+                        + TableProperties.FORMAT_VERSION);
+            }
+        }
+        if (formatVersion == null) {
+            return 2;
+        }
+        try {
+            return Integer.parseInt(formatVersion);
+        } catch (NumberFormatException ignored) {
+            return 2;
+        }
+    }
+
     public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
         if (expr == null) {
             return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 14869e7925c..49e681cde11 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -49,6 +49,7 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -380,6 +381,9 @@ public class CreateTableInfo {
                     + " Make sure 'engine' type is specified when use the 
catalog: " + ctlName);
             }
         }
+        if (Strings.isNullOrEmpty(ctlName)) {
+            return;
+        }
         CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlName);
         if (catalog instanceof HMSExternalCatalog && 
!engineName.equals(ENGINE_HIVE)) {
             throw new AnalysisException("Hms type catalog can only use `hive` 
engine.");
@@ -791,6 +795,10 @@ public class CreateTableInfo {
                         + "and you can use 'bucket(num, column)' in 
'PARTITIONED BY'.");
             }
 
+            if (engineName.equalsIgnoreCase(ENGINE_ICEBERG)) {
+                validateIcebergRowLineageColumns();
+            }
+
             // Validate Iceberg sort order columns
             if (sortOrderFields != null && !sortOrderFields.isEmpty()) {
                 if (!engineName.equalsIgnoreCase(ENGINE_ICEBERG)) {
@@ -1105,6 +1113,34 @@ public class CreateTableInfo {
         }
     }
 
+    /**
+     * Validate that Iceberg v3 tables do not define row lineage reserved 
columns.
+     */
+    public void validateIcebergRowLineageColumns(int formatVersion) {
+        if (formatVersion < IcebergUtils.ICEBERG_ROW_LINEAGE_MIN_VERSION) {
+            return;
+        }
+        for (ColumnDefinition columnDef : columns) {
+            if (IcebergUtils.isIcebergRowLineageColumn(columnDef.getName())) {
+                throw new AnalysisException("Cannot create Iceberg v" + 
formatVersion
+                        + " table with reserved row lineage column: " + 
columnDef.getName());
+            }
+        }
+    }
+
+    private void validateIcebergRowLineageColumns() {
+        validateIcebergRowLineageColumns(getEffectiveIcebergFormatVersion());
+    }
+
+    private int getEffectiveIcebergFormatVersion() {
+        CatalogIf catalog = Strings.isNullOrEmpty(ctlName) ? null
+                : Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlName);
+        if (catalog instanceof IcebergExternalCatalog) {
+            return IcebergUtils.getEffectiveIcebergFormatVersion(properties, 
catalog.getProperties());
+        }
+        return IcebergUtils.getEffectiveIcebergFormatVersion(properties, 
Collections.emptyMap());
+    }
+
     /**
      * analyzeEngine
      */
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
index 143438a71bb..dc344b08e13 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergDDLAndDMLPlanTest.java
@@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
 import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
 import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
@@ -273,6 +274,76 @@ public class IcebergDDLAndDMLPlanTest extends 
TestWithFeService {
         assertContainsPhysicalSink(physicalPlan, 
PhysicalIcebergDeleteSink.class);
     }
 
+    @Test
+    public void testCreateIcebergV3TableRejectsRowLineageReservedColumn() 
throws Exception {
+        useIceberg();
+        String rowIdTable = "row_lineage_reserved_" + 
UUID.randomUUID().toString().replace("-", "");
+        String rowIdSql = "create table " + rowIdTable
+                + " (_row_id bigint) properties('format-version'='3')";
+        LogicalPlan rowIdPlan = parseStmt(rowIdSql);
+        Assertions.assertTrue(rowIdPlan instanceof CreateTableCommand);
+        
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
+                () -> ((CreateTableCommand) 
rowIdPlan).getCreateTableInfo().validate(connectContext));
+
+        String lastUpdatedSequenceNumberTable =
+                "row_lineage_reserved_" + 
UUID.randomUUID().toString().replace("-", "");
+        String lastUpdatedSequenceNumberSql = "create table " + 
lastUpdatedSequenceNumberTable
+                + " (_last_updated_sequence_number bigint) 
properties('format-version'='3')";
+        LogicalPlan lastUpdatedSequenceNumberPlan = 
parseStmt(lastUpdatedSequenceNumberSql);
+        Assertions.assertTrue(lastUpdatedSequenceNumberPlan instanceof 
CreateTableCommand);
+        
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
+                () -> ((CreateTableCommand) 
lastUpdatedSequenceNumberPlan).getCreateTableInfo()
+                        .validate(connectContext));
+
+        String formatV2Table = "row_lineage_reserved_" + 
UUID.randomUUID().toString().replace("-", "");
+        String formatV2Sql = "create table " + formatV2Table
+                + " (_row_id bigint) properties('format-version'='2')";
+        LogicalPlan formatV2Plan = parseStmt(formatV2Sql);
+        Assertions.assertTrue(formatV2Plan instanceof CreateTableCommand);
+        Assertions.assertDoesNotThrow(
+                () -> ((CreateTableCommand) 
formatV2Plan).getCreateTableInfo().validate(connectContext));
+    }
+
+    @Test
+    public void 
testCreateIcebergDefaultV3TableRejectsRowLineageReservedColumn() throws 
Exception {
+        useIceberg();
+        IcebergExternalCatalog catalog = (IcebergExternalCatalog) 
Env.getCurrentEnv()
+                .getCatalogMgr().getCatalog(catalogName);
+        
catalog.getCatalogProperty().addProperty("table-default.format-version", "3");
+        try {
+            String rowIdTable = "row_lineage_reserved_" + 
UUID.randomUUID().toString().replace("-", "");
+            String rowIdSql = "create table " + rowIdTable + " (_row_id 
bigint)";
+            LogicalPlan rowIdPlan = parseStmt(rowIdSql);
+            Assertions.assertTrue(rowIdPlan instanceof CreateTableCommand);
+            
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
+                    () -> ((CreateTableCommand) 
rowIdPlan).getCreateTableInfo().validate(connectContext));
+            
Assertions.assertFalse(catalog.getCatalog().tableExists(TableIdentifier.of(dbName,
 rowIdTable)));
+
+            String formatV2Table = "row_lineage_reserved_" + 
UUID.randomUUID().toString().replace("-", "");
+            String formatV2Sql = "create table " + formatV2Table
+                    + " (_row_id bigint) properties('format-version'='2')";
+            LogicalPlan formatV2Plan = parseStmt(formatV2Sql);
+            Assertions.assertTrue(formatV2Plan instanceof CreateTableCommand);
+            Assertions.assertDoesNotThrow(
+                    () -> ((CreateTableCommand) 
formatV2Plan).getCreateTableInfo().validate(connectContext));
+        } finally {
+            
catalog.getCatalogProperty().deleteProperty("table-default.format-version");
+        }
+    }
+
+    @Test
+    public void testIcebergV3CtasRejectsRowLineageReservedColumn() throws 
Exception {
+        useIceberg();
+        String ctasTable = "row_lineage_reserved_" + 
UUID.randomUUID().toString().replace("-", "");
+        String ctasSql = "create table " + ctasTable
+                + " properties('format-version'='3') as select 1 as _row_id";
+        LogicalPlan ctasPlan = parseStmt(ctasSql);
+        Assertions.assertTrue(ctasPlan instanceof CreateTableCommand);
+        
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
+                () -> ((CreateTableCommand) 
ctasPlan).validateCreateTableAsSelect(
+                        connectContext, ((CreateTableCommand) 
ctasPlan).getCtasQuery().get()));
+    }
+
     @Test
     public void testIcebergUpdatePlans() throws Exception {
         useIceberg();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergMetadataOpTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergMetadataOpTest.java
index 079a0c9c312..b72a9ffba20 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergMetadataOpTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergMetadataOpTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.ExternalDatabase;
@@ -28,7 +30,12 @@ import org.apache.doris.filesystem.FileIterator;
 import org.apache.doris.filesystem.FileSystem;
 import org.apache.doris.filesystem.Location;
 import org.apache.doris.fs.MemoryFileSystem;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
 
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -36,6 +43,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
 import org.junit.Assert;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -129,6 +137,42 @@ public class IcebergMetadataOpTest {
         Assert.assertEquals(Collections.singletonList("DORIS_HORIZON_T"), 
tableNames);
     }
 
+    @Test
+    public void testPerformCreateTableRespectsCatalogDefaultFormatVersion() 
throws Exception {
+        Map<String, String> catalogProps = new HashMap<>();
+        catalogProps.put(CatalogProperties.TABLE_DEFAULT_PREFIX + 
TableProperties.FORMAT_VERSION, "3");
+        IcebergExternalCatalog dorisCatalog = mockHmsCatalog(catalogProps);
+        Catalog icebergCatalog = Mockito.mock(Catalog.class,
+                
Mockito.withSettings().extraInterfaces(SupportsNamespaces.class));
+        IcebergMetadataOps ops = new IcebergMetadataOps(dorisCatalog, 
icebergCatalog);
+
+        ExternalDatabase<?> dorisDb = Mockito.mock(ExternalDatabase.class);
+        Mockito.when(dorisDb.getRemoteName()).thenReturn("db");
+        Mockito.when(dorisDb.getTableNullable("tbl")).thenReturn(null);
+        Mockito.doReturn(dorisDb).when(dorisCatalog).getDbNullable("db");
+        Mockito.when(dorisCatalog.getName()).thenReturn("iceberg_catalog");
+        Mockito.when(icebergCatalog.tableExists(TableIdentifier.of("db", 
"tbl"))).thenReturn(false);
+
+        CreateTableInfo createTableInfo = Mockito.mock(CreateTableInfo.class);
+        Map<String, String> tableProps = new HashMap<>();
+        Mockito.when(createTableInfo.getDbName()).thenReturn("db");
+        Mockito.when(createTableInfo.getTableName()).thenReturn("tbl");
+        Mockito.when(createTableInfo.isIfNotExists()).thenReturn(false);
+        
Mockito.when(createTableInfo.getColumns()).thenReturn(Collections.singletonList(
+                new Column("id", Type.INT, true)));
+        Mockito.when(createTableInfo.getProperties()).thenReturn(tableProps);
+
+        ops.performCreateTable(createTableInfo);
+
+        Mockito.verify(createTableInfo).validateIcebergRowLineageColumns(3);
+        ArgumentCaptor<Map<String, String>> propsCaptor = 
ArgumentCaptor.forClass(Map.class);
+        
Mockito.verify(icebergCatalog).createTable(Mockito.eq(TableIdentifier.of("db", 
"tbl")),
+                Mockito.any(Schema.class), Mockito.any(PartitionSpec.class), 
propsCaptor.capture());
+        
Assert.assertFalse(propsCaptor.getValue().containsKey(TableProperties.FORMAT_VERSION));
+        Assert.assertEquals(3, IcebergUtils.getEffectiveIcebergFormatVersion(
+                propsCaptor.getValue(), catalogProps));
+    }
+
     @Test
     public void testDropTableCleansEmptyTableLocation() throws Exception {
         MemoryFileSystem fs = new MemoryFileSystem();
@@ -214,10 +258,14 @@ public class IcebergMetadataOpTest {
     }
 
     private IcebergExternalCatalog mockHmsCatalog() {
+        return mockHmsCatalog(Collections.emptyMap());
+    }
+
+    private IcebergExternalCatalog mockHmsCatalog(Map<String, String> 
catalogProperties) {
         IcebergExternalCatalog dorisCatalog = 
Mockito.mock(IcebergExternalCatalog.class);
         Mockito.when(dorisCatalog.getExecutionAuthenticator()).thenReturn(new 
ExecutionAuthenticator() {
         });
-        
Mockito.when(dorisCatalog.getProperties()).thenReturn(Collections.emptyMap());
+        
Mockito.when(dorisCatalog.getProperties()).thenReturn(catalogProperties);
         
Mockito.when(dorisCatalog.getIcebergCatalogType()).thenReturn(IcebergExternalCatalog.ICEBERG_HMS);
         Mockito.when(dorisCatalog.getCatalogProperty()).thenReturn(new 
CatalogProperty(null, Collections.emptyMap()));
         return dorisCatalog;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to