This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 85904f2ff [lake] Paimon lake table support alter table properties
(#1754)
85904f2ff is described below
commit 85904f2ff7b050ed984829361d69ae981f38c960
Author: Liebing <[email protected]>
AuthorDate: Mon Sep 29 19:07:18 2025 +0800
[lake] Paimon lake table support alter table properties (#1754)
---
.../apache/fluss/lake/lakestorage/LakeCatalog.java | 14 +++
.../lake/lakestorage/PluginLakeStorageWrapper.java | 12 ++
.../fluss/lake/lakestorage/LakeStorageTest.java | 7 ++
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 9 ++
.../apache/fluss/lake/lance/LanceLakeCatalog.java | 9 ++
.../fluss/lake/paimon/PaimonLakeCatalog.java | 41 +++++++
.../fluss/lake/paimon/utils/PaimonConversions.java | 29 +++++
.../lake/paimon/LakeEnabledTableCreateITCase.java | 126 ++++++++++++++++++++-
.../fluss/lake/paimon/PaimonLakeCatalogTest.java | 98 ++++++++++++++++
.../server/coordinator/CoordinatorService.java | 43 ++++++-
.../fluss/server/coordinator/MetadataManager.java | 51 ++++++---
.../fluss/server/utils/ServerRpcMessageUtils.java | 45 +-------
.../lakehouse/TestingPaimonStoragePlugin.java | 9 ++
13 files changed, 435 insertions(+), 58 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
index 813c6629f..2dce05471 100644
---
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
+++
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
@@ -19,9 +19,13 @@ package org.apache.fluss.lake.lakestorage;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
+import java.util.List;
+
/**
* A catalog interface to modify metadata in external datalake.
*
@@ -40,6 +44,16 @@ public interface LakeCatalog extends AutoCloseable {
void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
throws TableAlreadyExistException;
+ /**
+ * Alter a table in lake.
+ *
+ * @param tablePath path of the table to be altered
+ * @param tableChanges The changes to be applied to the table
+ * @throws TableNotExistException if the table not exists
+ */
+ void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+ throws TableNotExistException;
+
@Override
default void close() throws Exception {
// default do nothing
diff --git
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
index ce4143d88..9c75d3609 100644
---
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
+++
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
@@ -19,13 +19,17 @@ package org.apache.fluss.lake.lakestorage;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.TemporaryClassLoaderContext;
import org.apache.fluss.utils.WrappingProxy;
+import java.util.List;
+
/**
* A wrapper around {@link LakeStoragePlugin} that ensures the plugin
classloader is used for all
* {@link LakeCatalog} operations.
@@ -78,6 +82,14 @@ public class PluginLakeStorageWrapper implements
LakeStoragePlugin {
}
}
+ @Override
+ public void alterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ throws TableNotExistException {
+ try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(loader)) {
+ inner.alterTable(tablePath, tableChanges);
+ }
+ }
+
@Override
public void close() throws Exception {
try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(loader)) {
diff --git
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
index f0452812b..5812cc3ca 100644
---
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
@@ -19,8 +19,10 @@ package org.apache.fluss.lake.lakestorage;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.plugin.PluginManager;
@@ -30,6 +32,7 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -145,5 +148,9 @@ class LakeStorageTest {
@Override
public void createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
throws TableAlreadyExistException {}
+
+ @Override
+ public void alterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ throws TableNotExistException {}
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 842fcddde..7b3a913b8 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -20,8 +20,10 @@ package org.apache.fluss.lake.iceberg;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.IOUtils;
@@ -112,6 +114,13 @@ public class IcebergLakeCatalog implements LakeCatalog {
}
}
+ @Override
+ public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+ throws TableNotExistException {
+ throw new UnsupportedOperationException(
+ "Alter table is not supported for Iceberg at the moment");
+ }
+
private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
return TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
index 16a063499..2a55fc46a 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
@@ -19,9 +19,11 @@ package org.apache.fluss.lake.lance;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
@@ -68,6 +70,13 @@ public class LanceLakeCatalog implements LakeCatalog {
}
}
+ @Override
+ public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+ throws TableNotExistException {
+ throw new UnsupportedOperationException(
+ "Alter table is not supported for Lance at the moment");
+ }
+
@Override
public void close() throws Exception {
LakeCatalog.super.close();
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 9504ccc51..f84b6e590 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -17,10 +17,13 @@
package org.apache.fluss.lake.paimon;
+import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.IOUtils;
@@ -32,6 +35,7 @@ import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -39,6 +43,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -72,6 +77,11 @@ public class PaimonLakeCatalog implements LakeCatalog {
CatalogContext.create(Options.fromMap(configuration.toMap())));
}
+ @VisibleForTesting
+ protected Catalog getPaimonCatalog() {
+ return paimonCatalog;
+ }
+
@Override
public void createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
throws TableAlreadyExistException {
@@ -97,6 +107,20 @@ public class PaimonLakeCatalog implements LakeCatalog {
}
}
+ @Override
+ public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+ throws TableNotExistException {
+ try {
+ Identifier paimonPath = toPaimonIdentifier(tablePath);
+ List<SchemaChange> paimonSchemaChanges =
+ toPaimonSchemaChanges(tableChanges,
this::getFlussPropertyKeyToPaimon);
+ alterTable(paimonPath, paimonSchemaChanges);
+ } catch (Catalog.ColumnAlreadyExistException |
Catalog.ColumnNotExistException e) {
+ // shouldn't happen before we support schema change
+ throw new RuntimeException(e);
+ }
+ }
+
private void createTable(Identifier tablePath, Schema schema)
throws Catalog.DatabaseNotExistException {
try {
@@ -116,6 +140,15 @@ public class PaimonLakeCatalog implements LakeCatalog {
}
}
+ private void alterTable(Identifier tablePath, List<SchemaChange>
tableChanges)
+ throws Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
+ try {
+ paimonCatalog.alterTable(tablePath, tableChanges, false);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException("Table " + tablePath + " not
exists.");
+ }
+ }
+
private Identifier toPaimonIdentifier(TablePath tablePath) {
return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
}
@@ -190,6 +223,14 @@ public class PaimonLakeCatalog implements LakeCatalog {
}
}
+ private String getFlussPropertyKeyToPaimon(String key) {
+ if (key.startsWith(PAIMON_CONF_PREFIX)) {
+ return key.substring(PAIMON_CONF_PREFIX.length());
+ } else {
+ return FLUSS_CONF_PREFIX + key;
+ }
+ }
+
@Override
public void close() {
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index 72033f257..c456411e8 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.paimon.utils;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.row.GenericRow;
@@ -28,13 +29,16 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
/** Utils for conversion between Paimon and Fluss. */
public class PaimonConversions {
@@ -106,4 +110,29 @@ public class PaimonConversions {
return org.apache.paimon.data.InternalRow.createFieldGetter(dataType,
0)
.getFieldOrNull(flussRowAsPaimonRow);
}
+
+ public static List<SchemaChange> toPaimonSchemaChanges(
+ List<TableChange> tableChanges, Function<String, String>
optionKeyTransformer) {
+ List<SchemaChange> schemaChanges = new
ArrayList<>(tableChanges.size());
+
+ for (TableChange tableChange : tableChanges) {
+ if (tableChange instanceof TableChange.SetOption) {
+ TableChange.SetOption setOption = (TableChange.SetOption)
tableChange;
+ schemaChanges.add(
+ SchemaChange.setOption(
+ optionKeyTransformer.apply(setOption.getKey()),
+ setOption.getValue()));
+ } else if (tableChange instanceof TableChange.ResetOption) {
+ TableChange.ResetOption resetOption =
(TableChange.ResetOption) tableChange;
+ schemaChanges.add(
+ SchemaChange.removeOption(
+
optionKeyTransformer.apply(resetOption.getKey())));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported table change: " + tableChange.getClass());
+ }
+ }
+
+ return schemaChanges;
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 5e97a635a..99f33b05e 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -23,7 +23,9 @@ import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
@@ -113,6 +115,7 @@ class LakeEnabledTableCreateITCase {
throw new FlussRuntimeException("Failed to create warehouse path");
}
conf.setString("datalake.paimon.warehouse", warehousePath);
+ conf.setString("datalake.paimon.cache-enabled", "false");
paimonCatalog =
CatalogFactory.createCatalog(
CatalogContext.create(Options.fromMap(extractLakeProperties(conf))));
@@ -373,7 +376,7 @@ class LakeEnabledTableCreateITCase {
customProperties.put("k1", "v1");
customProperties.put("paimon.file.format", "parquet");
- // log table with lake disabled
+ // create log table with lake disabled
TableDescriptor logTable =
TableDescriptor.builder()
.schema(
@@ -429,6 +432,27 @@ class LakeEnabledTableCreateITCase {
}),
"log_c1,log_c2",
BUCKET_NUM);
+
+ // disable lake table
+ TableChange.SetOption disableLake =
+ TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"false");
+ changes = Collections.singletonList(disableLake);
+ admin.alterTable(logTablePath, changes, false).get();
+ // paimon table should still exist although lake is disabled
+ paimonCatalog.getTable(Identifier.create(DATABASE,
logTablePath.getTableName()));
+
+ // try to enable lake table again
+ enableLake =
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
+ List<TableChange> finalChanges = Collections.singletonList(enableLake);
+ // TODO: After #846 is implemented, we should remove this exception
assertion.
+ assertThatThrownBy(() -> admin.alterTable(logTablePath, finalChanges,
false).get())
+ .cause()
+ .isInstanceOf(LakeTableAlreadyExistException.class)
+ .hasMessage(
+ String.format(
+ "The table %s already exists in paimon
catalog, please "
+ + "first drop the table in paimon
catalog or use a new table name.",
+ logTablePath));
}
@Test
@@ -455,6 +479,106 @@ class LakeEnabledTableCreateITCase {
}
}
+ @Test
+ void testAlterLakeEnabledTableProperties() throws Exception {
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("k1", "v1");
+ customProperties.put("paimon.file.format", "parquet");
+
+ // create table
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .customProperties(customProperties)
+ .distributedBy(BUCKET_NUM, "c1", "c2")
+ .build();
+ TablePath tablePath = TablePath.of(DATABASE, "alter_table");
+ admin.createTable(tablePath, tableDescriptor, false).get();
+ Table paimonTable =
+ paimonCatalog.getTable(Identifier.create(DATABASE,
tablePath.getTableName()));
+ verifyPaimonTable(
+ paimonTable,
+ tableDescriptor,
+ RowType.of(
+ new DataType[] {
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.STRING(),
+ // for __bucket, __offset, __timestamp
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.BIGINT(),
+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ },
+ new String[] {
+ "c1",
+ "c2",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME
+ }),
+ "c1,c2",
+ BUCKET_NUM);
+
+ // test alter table properties
+ List<TableChange> tableChanges =
+ Arrays.asList(TableChange.reset("k1"), TableChange.set("k2",
"v2"));
+ admin.alterTable(tablePath, tableChanges, false).get();
+ paimonTable = paimonCatalog.getTable(Identifier.create(DATABASE,
tablePath.getTableName()));
+ customProperties.remove("k1");
+ customProperties.put("k2", "v2");
+ tableDescriptor =
+
tableDescriptor.withProperties(tableDescriptor.getProperties(),
customProperties);
+ verifyPaimonTable(
+ paimonTable,
+ tableDescriptor,
+ RowType.of(
+ new DataType[] {
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.STRING(),
+ // for __bucket, __offset, __timestamp
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.BIGINT(),
+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ },
+ new String[] {
+ "c1",
+ "c2",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME
+ }),
+ "c1,c2",
+ BUCKET_NUM);
+
+ // test alter paimon properties, should throw exception
+ tableChanges =
Collections.singletonList(TableChange.set("paimon.bucket", "10"));
+ List<TableChange> finalTableChanges = tableChanges;
+ assertThatThrownBy(() -> admin.alterTable(tablePath,
finalTableChanges, false).get())
+ .cause()
+ .isInstanceOf(InvalidConfigException.class)
+ .hasMessage(
+ "Property 'paimon.bucket' is not supported to alter
which is for datalake table.");
+
+ // test alter table if lake table not exists
+ paimonCatalog.dropTable(Identifier.create(DATABASE,
tablePath.getTableName()), true);
+ tableChanges = Collections.singletonList(TableChange.set("k3", "v3"));
+ List<TableChange> finalTableChanges1 = tableChanges;
+ assertThatThrownBy(() -> admin.alterTable(tablePath,
finalTableChanges1, false).get())
+ .cause()
+ .isInstanceOf(FlussRuntimeException.class)
+ .hasMessageContaining(
+ "Lake table doesn't exists for lake-enabled table "
+ + tablePath
+ + ", which shouldn't be happened. Please check
if the lake table was deleted manually.");
+
+ // alter a not exist table when ignoreIfNotExists = true is ok
+ admin.alterTable(TablePath.of(DATABASE, "not_exist_table"),
tableChanges, true).get();
+ }
+
private void verifyPaimonTable(
Table paimonTable,
TableDescriptor flussTable,
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
new file mode 100644
index 000000000..c2275fe21
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link PaimonLakeCatalog}. */
+class PaimonLakeCatalogTest {
+
+ @TempDir private File tempWarehouseDir;
+
+ private PaimonLakeCatalog flussPaimonCatalog;
+
+ @BeforeEach
+ public void setUp() {
+ Configuration configuration = new Configuration();
+ configuration.setString("warehouse",
tempWarehouseDir.toURI().toString());
+ flussPaimonCatalog = new PaimonLakeCatalog(configuration);
+ }
+
+ @Test
+ void testAlterTableConfigs() throws Exception {
+ String database = "test_alter_table_configs_db";
+ String tableName = "test_alter_table_configs_table";
+ TablePath tablePath = TablePath.of(database, tableName);
+ Identifier identifier = Identifier.create(database, tableName);
+ createTable(database, tableName);
+ Table table =
flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
+
+ // value should be null for key
+ assertThat(table.options().get("key")).isEqualTo(null);
+
+ // set the value for key
+ flussPaimonCatalog.alterTable(tablePath,
Arrays.asList(TableChange.set("key", "value")));
+
+ table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
+ // we have set the value for key
+ assertThat(table.options().get("fluss.key")).isEqualTo("value");
+
+ // reset the value for key
+ flussPaimonCatalog.alterTable(tablePath,
Arrays.asList(TableChange.reset("key")));
+
+ table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
+ // we have reset the value for key
+ assertThat(table.options().get("fluss.key")).isEqualTo(null);
+ }
+
+ private void createTable(String database, String tableName) {
+ Schema flussSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("name", DataTypes.STRING())
+ .column("amount", DataTypes.INT())
+ .column("address", DataTypes.STRING())
+ .build();
+
+ TableDescriptor td =
+ TableDescriptor.builder()
+ .schema(flussSchema)
+ .distributedBy(3) // no bucket key
+ .build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+
+ flussPaimonCatalog.createTable(tablePath, td);
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 5e0c5dc2a..47cc555d2 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -21,6 +21,7 @@ import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.cluster.TabletServerInfo;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidDatabaseException;
import org.apache.fluss.exception.InvalidTableException;
@@ -34,6 +35,7 @@ import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
@@ -112,6 +114,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static
org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
@@ -121,8 +124,8 @@ import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.getCommitRemot
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.getPartitionSpec;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeCreateAclsResponse;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableChanges;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
-import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePropertyChanges;
import static
org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -298,11 +301,12 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
authorizer.authorize(currentSession(), OperationType.ALTER,
Resource.table(tablePath));
}
- TablePropertyChanges tablePropertyChanges =
- toTablePropertyChanges(request.getConfigChangesList());
+ List<TableChange> tableChanges =
toTableChanges(request.getConfigChangesList());
+ TablePropertyChanges tablePropertyChanges =
toTablePropertyChanges(tableChanges);
metadataManager.alterTableProperties(
tablePath,
+ tableChanges,
tablePropertyChanges,
request.isIgnoreIfNotExists(),
lakeCatalog,
@@ -312,6 +316,39 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
return CompletableFuture.completedFuture(new
AlterTablePropertiesResponse());
}
+ public static TablePropertyChanges
toTablePropertyChanges(List<TableChange> tableChanges) {
+ TablePropertyChanges.Builder builder = TablePropertyChanges.builder();
+ if (tableChanges.isEmpty()) {
+ return builder.build();
+ }
+
+ for (TableChange tableChange : tableChanges) {
+ if (tableChange instanceof TableChange.SetOption) {
+ TableChange.SetOption setOption = (TableChange.SetOption)
tableChange;
+ String optionKey = setOption.getKey();
+ if (isTableStorageConfig(optionKey)) {
+ builder.setTableProperty(optionKey, setOption.getValue());
+ } else {
+ // otherwise, it's considered as custom property
+ builder.setCustomProperty(optionKey, setOption.getValue());
+ }
+ } else if (tableChange instanceof TableChange.ResetOption) {
+ TableChange.ResetOption resetOption =
(TableChange.ResetOption) tableChange;
+ String optionKey = resetOption.getKey();
+ if (isTableStorageConfig(optionKey)) {
+ builder.resetTableProperty(optionKey);
+ } else {
+ // otherwise, it's considered as custom property
+ builder.resetCustomProperty(optionKey);
+ }
+ } else {
+ throw new InvalidAlterTableException(
+ "Unsupported alter table change: " + tableChange);
+ }
+ }
+ return builder.build();
+ }
+
private TableDescriptor applySystemDefaults(TableDescriptor
tableDescriptor) {
TableDescriptor newDescriptor = tableDescriptor;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 2056068a5..b1127ff39 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -40,6 +40,7 @@ import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
@@ -310,6 +311,7 @@ public class MetadataManager {
public void alterTableProperties(
TablePath tablePath,
+ List<TableChange> tableChanges,
TablePropertyChanges tablePropertyChanges,
boolean ignoreIfNotExists,
@Nullable LakeCatalog lakeCatalog,
@@ -340,7 +342,12 @@ public class MetadataManager {
// pre alter table properties, e.g. create lake table in lake
storage if it's to
// enable datalake for the table
preAlterTableProperties(
- tablePath, tableDescriptor, newDescriptor,
lakeCatalog, dataLakeFormat);
+ tablePath,
+ tableDescriptor,
+ newDescriptor,
+ tableChanges,
+ lakeCatalog,
+ dataLakeFormat);
// update the table to zk
TableRegistration updatedTableRegistration =
tableReg.newProperties(
@@ -377,27 +384,29 @@ public class MetadataManager {
TablePath tablePath,
TableDescriptor tableDescriptor,
TableDescriptor newDescriptor,
+ List<TableChange> tableChanges,
LakeCatalog lakeCatalog,
DataLakeFormat dataLakeFormat) {
-
- boolean toEnableDataLake =
- !isDataLakeEnabled(tableDescriptor) &&
isDataLakeEnabled(newDescriptor);
-
- // enable lake table
- if (toEnableDataLake) {
- // TODO: should tolerate if the lake exist but matches our schema.
This ensures
- // eventually
- // consistent by idempotently creating the table multiple times.
See #846
- // before create table in fluss, we may create in lake
+ if (isDataLakeEnabled(newDescriptor)) {
if (lakeCatalog == null) {
throw new InvalidAlterTableException(
"Cannot alter table "
+ tablePath
- + " to enable data lake, because the Fluss
cluster doesn't enable datalake tables.");
- } else {
+ + " in data lake, because the Fluss cluster
doesn't enable datalake tables.");
+ }
+
+ boolean isLakeTableNewlyCreated = false;
+ // to enable lake table
+ if (!isDataLakeEnabled(tableDescriptor)) {
+ // before create table in fluss, we may create in lake
try {
lakeCatalog.createTable(tablePath, newDescriptor);
+ // no need to alter lake table if it is newly created
+ isLakeTableNewlyCreated = true;
} catch (TableAlreadyExistException e) {
+ // TODO: should tolerate if the lake exist but matches our
schema. This ensures
+ // eventually consistent by idempotently creating the
table multiple times. See
+ // #846
throw new LakeTableAlreadyExistException(
String.format(
"The table %s already exists in %s
catalog, please "
@@ -405,8 +414,22 @@ public class MetadataManager {
tablePath, dataLakeFormat,
dataLakeFormat));
}
}
+
+ // only need to alter lake table if it is not newly created
+ if (!isLakeTableNewlyCreated) {
+ {
+ try {
+ lakeCatalog.alterTable(tablePath, tableChanges);
+ } catch (TableNotExistException e) {
+ throw new FlussRuntimeException(
+ "Lake table doesn't exists for lake-enabled
table "
+ + tablePath
+ + ", which shouldn't be happened.
Please check if the lake table was deleted manually.",
+ e);
+ }
+ }
+ }
}
- // more pre-alter actions can be added here
}
private void postAlterTableProperties(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index e9c055ed4..fcf2a6b9d 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -21,7 +21,6 @@ import org.apache.fluss.cluster.Endpoint;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.apache.fluss.metadata.AlterConfigOpType;
@@ -148,7 +147,6 @@ import
org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
import org.apache.fluss.server.entity.NotifyRemoteLogOffsetsData;
import org.apache.fluss.server.entity.StopReplicaData;
import org.apache.fluss.server.entity.StopReplicaResultForBucket;
-import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde;
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
@@ -179,7 +177,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclInfo;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -261,43 +258,11 @@ public class ServerRpcMessageUtils {
}
}
- public static TablePropertyChanges
toTablePropertyChanges(List<PbAlterConfig> alterConfigs) {
- TablePropertyChanges.Builder builder = TablePropertyChanges.builder();
- if (alterConfigs.isEmpty()) {
- return builder.build();
- }
-
- List<TableChange> tableChanges =
- alterConfigs.stream()
- .filter(Objects::nonNull)
- .map(ServerRpcMessageUtils::toTableChange)
- .collect(Collectors.toList());
-
- for (TableChange tableChange : tableChanges) {
- if (tableChange instanceof TableChange.SetOption) {
- TableChange.SetOption setOption = (TableChange.SetOption)
tableChange;
- String optionKey = setOption.getKey();
- if (isTableStorageConfig(optionKey)) {
- builder.setTableProperty(optionKey, setOption.getValue());
- } else {
- // otherwise, it's considered as custom property
- builder.setCustomProperty(optionKey, setOption.getValue());
- }
- } else if (tableChange instanceof TableChange.ResetOption) {
- TableChange.ResetOption resetOption =
(TableChange.ResetOption) tableChange;
- String optionKey = resetOption.getKey();
- if (isTableStorageConfig(optionKey)) {
- builder.resetTableProperty(optionKey);
- } else {
- // otherwise, it's considered as custom property
- builder.resetCustomProperty(optionKey);
- }
- } else {
- throw new InvalidAlterTableException(
- "Unsupported alter table change: " + tableChange);
- }
- }
- return builder.build();
+ public static List<TableChange> toTableChanges(List<PbAlterConfig>
alterConfigs) {
+ return alterConfigs.stream()
+ .filter(Objects::nonNull)
+ .map(ServerRpcMessageUtils::toTableChange)
+ .collect(Collectors.toList());
}
public static MetadataResponse buildMetadataResponse(
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
index 74962c486..deb2cddb0 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
@@ -19,16 +19,19 @@ package org.apache.fluss.server.lakehouse;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
import org.apache.fluss.lake.lakestorage.LakeStorage;
import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** A plugin of paimon just for testing purpose. */
@@ -79,6 +82,12 @@ public class TestingPaimonStoragePlugin implements
LakeStoragePlugin {
tableByPath.put(tablePath, tableDescriptor);
}
+ @Override
+ public void alterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ throws TableNotExistException {
+ // do nothing
+ }
+
public TableDescriptor getTable(TablePath tablePath) {
return tableByPath.get(tablePath);
}