This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 9481c1928 [lake/paimon] Prohibit users from setting Paimon properties
that Fluss depends on (#1826)
9481c1928 is described below
commit 9481c19289e93d8d7eb3d7b55159be8307a57141
Author: Liebing <[email protected]>
AuthorDate: Mon Oct 20 10:48:44 2025 +0800
[lake/paimon] Prohibit users from setting Paimon properties that Fluss
depends on (#1826)
---
.../fluss/lake/paimon/PaimonLakeCatalog.java | 108 +---------------
.../fluss/lake/paimon/utils/PaimonConversions.java | 143 ++++++++++++++++++++-
.../lake/paimon/LakeEnabledTableCreateITCase.java | 31 +++++
3 files changed, 174 insertions(+), 108 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 03ffd0cdc..b11d5adf2 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -19,7 +19,6 @@ package org.apache.fluss.lake.paimon;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
@@ -28,7 +27,6 @@ import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.IOUtils;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -41,8 +39,9 @@ import org.apache.paimon.types.DataTypes;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
+import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -66,11 +65,6 @@ public class PaimonLakeCatalog implements LakeCatalog {
private final Catalog paimonCatalog;
- // for fluss config
- private static final String FLUSS_CONF_PREFIX = "fluss.";
- // for paimon config
- private static final String PAIMON_CONF_PREFIX = "paimon.";
-
public PaimonLakeCatalog(Configuration configuration) {
this.paimonCatalog =
CatalogFactory.createCatalog(
@@ -86,7 +80,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
public void createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
throws TableAlreadyExistException {
// then, create the table
- Identifier paimonPath = toPaimonIdentifier(tablePath);
+ Identifier paimonPath = toPaimon(tablePath);
Schema paimonSchema = toPaimonSchema(tableDescriptor);
try {
createTable(paimonPath, paimonSchema);
@@ -111,9 +105,8 @@ public class PaimonLakeCatalog implements LakeCatalog {
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
throws TableNotExistException {
try {
- Identifier paimonPath = toPaimonIdentifier(tablePath);
- List<SchemaChange> paimonSchemaChanges =
- toPaimonSchemaChanges(tableChanges,
this::getFlussPropertyKeyToPaimon);
+ Identifier paimonPath = toPaimon(tablePath);
+ List<SchemaChange> paimonSchemaChanges =
toPaimonSchemaChanges(tableChanges);
alterTable(paimonPath, paimonSchemaChanges);
} catch (Catalog.ColumnAlreadyExistException |
Catalog.ColumnNotExistException e) {
// shouldn't happen before we support schema change
@@ -149,97 +142,6 @@ public class PaimonLakeCatalog implements LakeCatalog {
}
}
- private Identifier toPaimonIdentifier(TablePath tablePath) {
- return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
- }
-
- private Schema toPaimonSchema(TableDescriptor tableDescriptor) {
- Schema.Builder schemaBuilder = Schema.newBuilder();
- Options options = new Options();
-
- // set default properties
- setPaimonDefaultProperties(options);
-
- // When bucket key is undefined, it should use dynamic bucket (bucket
= -1) mode.
- List<String> bucketKeys = tableDescriptor.getBucketKeys();
- if (!bucketKeys.isEmpty()) {
- int numBuckets =
- tableDescriptor
- .getTableDistribution()
-
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- "Bucket count should be
set."));
- options.set(CoreOptions.BUCKET, numBuckets);
- options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
- } else {
- options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
- }
-
- // set schema
- for (org.apache.fluss.metadata.Schema.Column column :
- tableDescriptor.getSchema().getColumns()) {
- String columnName = column.getName();
- if (SYSTEM_COLUMNS.containsKey(columnName)) {
- throw new InvalidTableException(
- "Column "
- + columnName
- + " conflicts with a system column name of
paimon table, please rename the column.");
- }
- schemaBuilder.column(
- columnName,
-
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
- column.getComment().orElse(null));
- }
-
- // add system metadata columns to schema
- for (Map.Entry<String, DataType> systemColumn :
SYSTEM_COLUMNS.entrySet()) {
- schemaBuilder.column(systemColumn.getKey(),
systemColumn.getValue());
- }
-
- // set pk
- if (tableDescriptor.hasPrimaryKey()) {
- schemaBuilder.primaryKey(
-
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
- options.set(
- CoreOptions.CHANGELOG_PRODUCER.key(),
- CoreOptions.ChangelogProducer.INPUT.toString());
- }
- // set partition keys
- schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());
-
- // set properties to paimon schema
- tableDescriptor.getProperties().forEach((k, v) ->
setFlussPropertyToPaimon(k, v, options));
- tableDescriptor
- .getCustomProperties()
- .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
- schemaBuilder.options(options.toMap());
- return schemaBuilder.build();
- }
-
- private void setPaimonDefaultProperties(Options options) {
- // set partition.legacy-name to false, otherwise paimon will use
toString for all types,
- // which will cause inconsistent partition value for a same binary
value
- options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
- }
-
- private void setFlussPropertyToPaimon(String key, String value, Options
options) {
- if (key.startsWith(PAIMON_CONF_PREFIX)) {
- options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
- } else {
- options.set(FLUSS_CONF_PREFIX + key, value);
- }
- }
-
- private String getFlussPropertyKeyToPaimon(String key) {
- if (key.startsWith(PAIMON_CONF_PREFIX)) {
- return key.substring(PAIMON_CONF_PREFIX.length());
- } else {
- return FLUSS_CONF_PREFIX + key;
- }
- }
-
@Override
public void close() {
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index a9491659f..2c42c471e 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -17,26 +17,52 @@
package org.apache.fluss.lake.paimon.utils;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
/** Utils for conversion between Paimon and Fluss. */
public class PaimonConversions {
+ // for fluss config
+ private static final String FLUSS_CONF_PREFIX = "fluss.";
+ // for paimon config
+ private static final String PAIMON_CONF_PREFIX = "paimon.";
+
+ /** Paimon config options set by Fluss should not be set by users. */
+ @VisibleForTesting public static final Set<String>
PAIMON_UNSETTABLE_OPTIONS = new HashSet<>();
+
+ static {
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
+
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PARTITION_GENERATE_LEGCY_NAME.key());
+ }
+
public static RowKind toRowKind(ChangeType changeType) {
switch (changeType) {
case APPEND_ONLY:
@@ -80,8 +106,7 @@ public class PaimonConversions {
.getFieldOrNull(flussRowAsPaimonRow);
}
- public static List<SchemaChange> toPaimonSchemaChanges(
- List<TableChange> tableChanges, Function<String, String>
optionKeyTransformer) {
+ public static List<SchemaChange> toPaimonSchemaChanges(List<TableChange>
tableChanges) {
List<SchemaChange> schemaChanges = new
ArrayList<>(tableChanges.size());
for (TableChange tableChange : tableChanges) {
@@ -89,13 +114,13 @@ public class PaimonConversions {
TableChange.SetOption setOption = (TableChange.SetOption)
tableChange;
schemaChanges.add(
SchemaChange.setOption(
- optionKeyTransformer.apply(setOption.getKey()),
+
convertFlussPropertyKeyToPaimon(setOption.getKey()),
setOption.getValue()));
} else if (tableChange instanceof TableChange.ResetOption) {
TableChange.ResetOption resetOption =
(TableChange.ResetOption) tableChange;
schemaChanges.add(
SchemaChange.removeOption(
-
optionKeyTransformer.apply(resetOption.getKey())));
+
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
} else {
throw new UnsupportedOperationException(
"Unsupported table change: " + tableChange.getClass());
@@ -104,4 +129,112 @@ public class PaimonConversions {
return schemaChanges;
}
+
+ public static Schema toPaimonSchema(TableDescriptor tableDescriptor) {
+ // validate paimon options first
+ validatePaimonOptions(tableDescriptor.getProperties());
+ validatePaimonOptions(tableDescriptor.getCustomProperties());
+
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ Options options = new Options();
+
+ // set default properties
+ setPaimonDefaultProperties(options);
+
+ // When bucket key is undefined, it should use dynamic bucket (bucket
= -1) mode.
+ List<String> bucketKeys = tableDescriptor.getBucketKeys();
+ if (!bucketKeys.isEmpty()) {
+ int numBuckets =
+ tableDescriptor
+ .getTableDistribution()
+
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Bucket count should be
set."));
+ options.set(CoreOptions.BUCKET, numBuckets);
+ options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
+ } else {
+ options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
+ }
+
+ // set schema
+ for (org.apache.fluss.metadata.Schema.Column column :
+ tableDescriptor.getSchema().getColumns()) {
+ String columnName = column.getName();
+ if (SYSTEM_COLUMNS.containsKey(columnName)) {
+ throw new InvalidTableException(
+ "Column "
+ + columnName
+ + " conflicts with a system column name of
paimon table, please rename the column.");
+ }
+ schemaBuilder.column(
+ columnName,
+
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
+ column.getComment().orElse(null));
+ }
+
+ // add system metadata columns to schema
+ for (Map.Entry<String, DataType> systemColumn :
SYSTEM_COLUMNS.entrySet()) {
+ schemaBuilder.column(systemColumn.getKey(),
systemColumn.getValue());
+ }
+
+ // set pk
+ if (tableDescriptor.hasPrimaryKey()) {
+ schemaBuilder.primaryKey(
+
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
+ options.set(
+ CoreOptions.CHANGELOG_PRODUCER.key(),
+ CoreOptions.ChangelogProducer.INPUT.toString());
+ }
+ // set partition keys
+ schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());
+
+ // set properties to paimon schema
+ tableDescriptor.getProperties().forEach((k, v) ->
setFlussPropertyToPaimon(k, v, options));
+ tableDescriptor
+ .getCustomProperties()
+ .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
+ schemaBuilder.options(options.toMap());
+ return schemaBuilder.build();
+ }
+
+ private static void validatePaimonOptions(Map<String, String> properties) {
+ properties.forEach(
+ (k, v) -> {
+ String paimonKey = k;
+ if (k.startsWith(PAIMON_CONF_PREFIX)) {
+ paimonKey = k.substring(PAIMON_CONF_PREFIX.length());
+ }
+ if (PAIMON_UNSETTABLE_OPTIONS.contains(paimonKey)) {
+ throw new InvalidConfigException(
+ String.format(
+ "The Paimon option %s will be set
automatically by Fluss "
+ + "and should not be set
manually.",
+ k));
+ }
+ });
+ }
+
+ private static void setPaimonDefaultProperties(Options options) {
+ // set partition.legacy-name to false, otherwise paimon will use
toString for all types,
+ // which will cause inconsistent partition value for the same binary
value
+ options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
+ }
+
+ private static void setFlussPropertyToPaimon(String key, String value,
Options options) {
+ if (key.startsWith(PAIMON_CONF_PREFIX)) {
+ options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
+ } else {
+ options.set(FLUSS_CONF_PREFIX + key, value);
+ }
+ }
+
+ private static String convertFlussPropertyKeyToPaimon(String key) {
+ if (key.startsWith(PAIMON_CONF_PREFIX)) {
+ return key.substring(PAIMON_CONF_PREFIX.length());
+ } else {
+ return FLUSS_CONF_PREFIX + key;
+ }
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 99f33b05e..37033ea74 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -57,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -370,6 +371,36 @@ class LakeEnabledTableCreateITCase {
BUCKET_NUM);
}
+ @Test
+ void testCreateLakeEnableTableWithUnsettablePaimonOptions() {
+ Map<String, String> customProperties = new HashMap<>();
+
+ for (String key : PAIMON_UNSETTABLE_OPTIONS) {
+ customProperties.clear();
+ customProperties.put(key, "v");
+
+ TableDescriptor table =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED,
true)
+ .customProperties(customProperties)
+ .distributedBy(BUCKET_NUM, "c1", "c2")
+ .build();
+ TablePath tablePath = TablePath.of(DATABASE,
"table_unsettable_paimon_option");
+ assertThatThrownBy(() -> admin.createTable(tablePath, table,
false).get())
+ .cause()
+ .isInstanceOf(InvalidConfigException.class)
+ .hasMessage(
+ String.format(
+ "The Paimon option %s will be set
automatically by Fluss and should not be set manually.",
+ key));
+ }
+ }
+
@Test
void testAlterLakeEnabledLogTable() throws Exception {
Map<String, String> customProperties = new HashMap<>();