This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9481c1928 [lake/paimon] Prohibit users from setting Paimon properties 
that Fluss depends on (#1826)
9481c1928 is described below

commit 9481c19289e93d8d7eb3d7b55159be8307a57141
Author: Liebing <[email protected]>
AuthorDate: Mon Oct 20 10:48:44 2025 +0800

    [lake/paimon] Prohibit users from setting Paimon properties that Fluss 
depends on (#1826)
---
 .../fluss/lake/paimon/PaimonLakeCatalog.java       | 108 +---------------
 .../fluss/lake/paimon/utils/PaimonConversions.java | 143 ++++++++++++++++++++-
 .../lake/paimon/LakeEnabledTableCreateITCase.java  |  31 +++++
 3 files changed, 174 insertions(+), 108 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 03ffd0cdc..b11d5adf2 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -19,7 +19,6 @@ package org.apache.fluss.lake.paimon;
 
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
@@ -28,7 +27,6 @@ import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.IOUtils;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -41,8 +39,9 @@ import org.apache.paimon.types.DataTypes;
 
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
+import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -66,11 +65,6 @@ public class PaimonLakeCatalog implements LakeCatalog {
 
     private final Catalog paimonCatalog;
 
-    // for fluss config
-    private static final String FLUSS_CONF_PREFIX = "fluss.";
-    // for paimon config
-    private static final String PAIMON_CONF_PREFIX = "paimon.";
-
     public PaimonLakeCatalog(Configuration configuration) {
         this.paimonCatalog =
                 CatalogFactory.createCatalog(
@@ -86,7 +80,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
     public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
             throws TableAlreadyExistException {
         // then, create the table
-        Identifier paimonPath = toPaimonIdentifier(tablePath);
+        Identifier paimonPath = toPaimon(tablePath);
         Schema paimonSchema = toPaimonSchema(tableDescriptor);
         try {
             createTable(paimonPath, paimonSchema);
@@ -111,9 +105,8 @@ public class PaimonLakeCatalog implements LakeCatalog {
     public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
             throws TableNotExistException {
         try {
-            Identifier paimonPath = toPaimonIdentifier(tablePath);
-            List<SchemaChange> paimonSchemaChanges =
-                    toPaimonSchemaChanges(tableChanges, 
this::getFlussPropertyKeyToPaimon);
+            Identifier paimonPath = toPaimon(tablePath);
+            List<SchemaChange> paimonSchemaChanges = 
toPaimonSchemaChanges(tableChanges);
             alterTable(paimonPath, paimonSchemaChanges);
         } catch (Catalog.ColumnAlreadyExistException | 
Catalog.ColumnNotExistException e) {
             // shouldn't happen before we support schema change
@@ -149,97 +142,6 @@ public class PaimonLakeCatalog implements LakeCatalog {
         }
     }
 
-    private Identifier toPaimonIdentifier(TablePath tablePath) {
-        return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
-    }
-
-    private Schema toPaimonSchema(TableDescriptor tableDescriptor) {
-        Schema.Builder schemaBuilder = Schema.newBuilder();
-        Options options = new Options();
-
-        // set default properties
-        setPaimonDefaultProperties(options);
-
-        // When bucket key is undefined, it should use dynamic bucket (bucket 
= -1) mode.
-        List<String> bucketKeys = tableDescriptor.getBucketKeys();
-        if (!bucketKeys.isEmpty()) {
-            int numBuckets =
-                    tableDescriptor
-                            .getTableDistribution()
-                            
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
-                            .orElseThrow(
-                                    () ->
-                                            new IllegalArgumentException(
-                                                    "Bucket count should be 
set."));
-            options.set(CoreOptions.BUCKET, numBuckets);
-            options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
-        } else {
-            options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
-        }
-
-        // set schema
-        for (org.apache.fluss.metadata.Schema.Column column :
-                tableDescriptor.getSchema().getColumns()) {
-            String columnName = column.getName();
-            if (SYSTEM_COLUMNS.containsKey(columnName)) {
-                throw new InvalidTableException(
-                        "Column "
-                                + columnName
-                                + " conflicts with a system column name of 
paimon table, please rename the column.");
-            }
-            schemaBuilder.column(
-                    columnName,
-                    
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
-                    column.getComment().orElse(null));
-        }
-
-        // add system metadata columns to schema
-        for (Map.Entry<String, DataType> systemColumn : 
SYSTEM_COLUMNS.entrySet()) {
-            schemaBuilder.column(systemColumn.getKey(), 
systemColumn.getValue());
-        }
-
-        // set pk
-        if (tableDescriptor.hasPrimaryKey()) {
-            schemaBuilder.primaryKey(
-                    
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
-            options.set(
-                    CoreOptions.CHANGELOG_PRODUCER.key(),
-                    CoreOptions.ChangelogProducer.INPUT.toString());
-        }
-        // set partition keys
-        schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());
-
-        // set properties to paimon schema
-        tableDescriptor.getProperties().forEach((k, v) -> 
setFlussPropertyToPaimon(k, v, options));
-        tableDescriptor
-                .getCustomProperties()
-                .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
-        schemaBuilder.options(options.toMap());
-        return schemaBuilder.build();
-    }
-
-    private void setPaimonDefaultProperties(Options options) {
-        // set partition.legacy-name to false, otherwise paimon will use 
toString for all types,
-        // which will cause inconsistent partition value for a same binary 
value
-        options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
-    }
-
-    private void setFlussPropertyToPaimon(String key, String value, Options 
options) {
-        if (key.startsWith(PAIMON_CONF_PREFIX)) {
-            options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
-        } else {
-            options.set(FLUSS_CONF_PREFIX + key, value);
-        }
-    }
-
-    private String getFlussPropertyKeyToPaimon(String key) {
-        if (key.startsWith(PAIMON_CONF_PREFIX)) {
-            return key.substring(PAIMON_CONF_PREFIX.length());
-        } else {
-            return FLUSS_CONF_PREFIX + key;
-        }
-    }
-
     @Override
     public void close() {
         IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index a9491659f..2c42c471e 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -17,26 +17,52 @@
 
 package org.apache.fluss.lake.paimon.utils;
 
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
 import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
 import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
 
 /** Utils for conversion between Paimon and Fluss. */
 public class PaimonConversions {
 
+    // for fluss config
+    private static final String FLUSS_CONF_PREFIX = "fluss.";
+    // for paimon config
+    private static final String PAIMON_CONF_PREFIX = "paimon.";
+
+    /** Paimon config options set by Fluss should not be set by users. */
+    @VisibleForTesting public static final Set<String> 
PAIMON_UNSETTABLE_OPTIONS = new HashSet<>();
+
+    static {
+        PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
+        PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
+        
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PARTITION_GENERATE_LEGCY_NAME.key());
+    }
+
     public static RowKind toRowKind(ChangeType changeType) {
         switch (changeType) {
             case APPEND_ONLY:
@@ -80,8 +106,7 @@ public class PaimonConversions {
                 .getFieldOrNull(flussRowAsPaimonRow);
     }
 
-    public static List<SchemaChange> toPaimonSchemaChanges(
-            List<TableChange> tableChanges, Function<String, String> 
optionKeyTransformer) {
+    public static List<SchemaChange> toPaimonSchemaChanges(List<TableChange> 
tableChanges) {
         List<SchemaChange> schemaChanges = new 
ArrayList<>(tableChanges.size());
 
         for (TableChange tableChange : tableChanges) {
@@ -89,13 +114,13 @@ public class PaimonConversions {
                 TableChange.SetOption setOption = (TableChange.SetOption) 
tableChange;
                 schemaChanges.add(
                         SchemaChange.setOption(
-                                optionKeyTransformer.apply(setOption.getKey()),
+                                
convertFlussPropertyKeyToPaimon(setOption.getKey()),
                                 setOption.getValue()));
             } else if (tableChange instanceof TableChange.ResetOption) {
                 TableChange.ResetOption resetOption = 
(TableChange.ResetOption) tableChange;
                 schemaChanges.add(
                         SchemaChange.removeOption(
-                                
optionKeyTransformer.apply(resetOption.getKey())));
+                                
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
             } else {
                 throw new UnsupportedOperationException(
                         "Unsupported table change: " + tableChange.getClass());
@@ -104,4 +129,112 @@ public class PaimonConversions {
 
         return schemaChanges;
     }
+
+    public static Schema toPaimonSchema(TableDescriptor tableDescriptor) {
+        // validate paimon options first
+        validatePaimonOptions(tableDescriptor.getProperties());
+        validatePaimonOptions(tableDescriptor.getCustomProperties());
+
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        Options options = new Options();
+
+        // set default properties
+        setPaimonDefaultProperties(options);
+
+        // When bucket key is undefined, it should use dynamic bucket (bucket 
= -1) mode.
+        List<String> bucketKeys = tableDescriptor.getBucketKeys();
+        if (!bucketKeys.isEmpty()) {
+            int numBuckets =
+                    tableDescriptor
+                            .getTableDistribution()
+                            
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
+                            .orElseThrow(
+                                    () ->
+                                            new IllegalArgumentException(
+                                                    "Bucket count should be 
set."));
+            options.set(CoreOptions.BUCKET, numBuckets);
+            options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
+        } else {
+            options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
+        }
+
+        // set schema
+        for (org.apache.fluss.metadata.Schema.Column column :
+                tableDescriptor.getSchema().getColumns()) {
+            String columnName = column.getName();
+            if (SYSTEM_COLUMNS.containsKey(columnName)) {
+                throw new InvalidTableException(
+                        "Column "
+                                + columnName
+                                + " conflicts with a system column name of 
paimon table, please rename the column.");
+            }
+            schemaBuilder.column(
+                    columnName,
+                    
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
+                    column.getComment().orElse(null));
+        }
+
+        // add system metadata columns to schema
+        for (Map.Entry<String, DataType> systemColumn : 
SYSTEM_COLUMNS.entrySet()) {
+            schemaBuilder.column(systemColumn.getKey(), 
systemColumn.getValue());
+        }
+
+        // set pk
+        if (tableDescriptor.hasPrimaryKey()) {
+            schemaBuilder.primaryKey(
+                    
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
+            options.set(
+                    CoreOptions.CHANGELOG_PRODUCER.key(),
+                    CoreOptions.ChangelogProducer.INPUT.toString());
+        }
+        // set partition keys
+        schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());
+
+        // set properties to paimon schema
+        tableDescriptor.getProperties().forEach((k, v) -> 
setFlussPropertyToPaimon(k, v, options));
+        tableDescriptor
+                .getCustomProperties()
+                .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
+        schemaBuilder.options(options.toMap());
+        return schemaBuilder.build();
+    }
+
+    private static void validatePaimonOptions(Map<String, String> properties) {
+        properties.forEach(
+                (k, v) -> {
+                    String paimonKey = k;
+                    if (k.startsWith(PAIMON_CONF_PREFIX)) {
+                        paimonKey = k.substring(PAIMON_CONF_PREFIX.length());
+                    }
+                    if (PAIMON_UNSETTABLE_OPTIONS.contains(paimonKey)) {
+                        throw new InvalidConfigException(
+                                String.format(
+                                        "The Paimon option %s will be set 
automatically by Fluss "
+                                                + "and should not be set 
manually.",
+                                        k));
+                    }
+                });
+    }
+
+    private static void setPaimonDefaultProperties(Options options) {
+        // set partition.legacy-name to false, otherwise paimon will use 
toString for all types,
+        // which will cause inconsistent partition value for the same binary 
value
+        options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
+    }
+
+    private static void setFlussPropertyToPaimon(String key, String value, 
Options options) {
+        if (key.startsWith(PAIMON_CONF_PREFIX)) {
+            options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
+        } else {
+            options.set(FLUSS_CONF_PREFIX + key, value);
+        }
+    }
+
+    private static String convertFlussPropertyKeyToPaimon(String key) {
+        if (key.startsWith(PAIMON_CONF_PREFIX)) {
+            return key.substring(PAIMON_CONF_PREFIX.length());
+        } else {
+            return FLUSS_CONF_PREFIX + key;
+        }
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 99f33b05e..37033ea74 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -370,6 +371,36 @@ class LakeEnabledTableCreateITCase {
                 BUCKET_NUM);
     }
 
+    @Test
+    void testCreateLakeEnableTableWithUnsettablePaimonOptions() {
+        Map<String, String> customProperties = new HashMap<>();
+
+        for (String key : PAIMON_UNSETTABLE_OPTIONS) {
+            customProperties.clear();
+            customProperties.put(key, "v");
+
+            TableDescriptor table =
+                    TableDescriptor.builder()
+                            .schema(
+                                    Schema.newBuilder()
+                                            .column("c1", DataTypes.INT())
+                                            .column("c2", DataTypes.STRING())
+                                            .build())
+                            .property(ConfigOptions.TABLE_DATALAKE_ENABLED, 
true)
+                            .customProperties(customProperties)
+                            .distributedBy(BUCKET_NUM, "c1", "c2")
+                            .build();
+            TablePath tablePath = TablePath.of(DATABASE, 
"table_unsettable_paimon_option");
+            assertThatThrownBy(() -> admin.createTable(tablePath, table, 
false).get())
+                    .cause()
+                    .isInstanceOf(InvalidConfigException.class)
+                    .hasMessage(
+                            String.format(
+                                    "The Paimon option %s will be set 
automatically by Fluss and should not be set manually.",
+                                    key));
+        }
+    }
+
     @Test
     void testAlterLakeEnabledLogTable() throws Exception {
         Map<String, String> customProperties = new HashMap<>();

Reply via email to