This is an automated email from the ASF dual-hosted git repository. hongshun pushed a commit to branch dlf-2.5 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit dfe2176291ecbbebc613cf5e73b3bbc19a080bb8 Author: luoyuxia <[email protected]> AuthorDate: Mon Aug 18 17:45:44 2025 +0800 support enable lake dynamically --- .../com/alibaba/fluss/client/table/FlussTable.java | 18 +++++ .../java/com/alibaba/fluss/client/table/Table.java | 19 ++++++ .../fluss/client/utils/ClientRpcMessageUtils.java | 22 ++++++ .../com/alibaba/fluss/config/FlussConfigUtils.java | 8 +++ .../exception/InvalidAlterTableException.java | 26 ++++++++ .../alibaba/fluss/metadata/UpdateProperties.java | 74 ++++++++++++++++++++ .../lake/paimon/LakeEnabledTableCreateITCase.java | 32 +++++++++ .../alibaba/fluss/rpc/gateway/AdminGateway.java | 10 +++ .../com/alibaba/fluss/rpc/protocol/ApiKeys.java | 3 +- .../com/alibaba/fluss/rpc/protocol/Errors.java | 5 +- fluss-rpc/src/main/proto/FlussApi.proto | 13 ++++ .../server/coordinator/CoordinatorService.java | 11 +++ .../fluss/server/coordinator/MetadataManager.java | 78 ++++++++++++++++++++++ .../fluss/server/entity/AlterTableData.java | 42 ++++++++++++ .../fluss/server/utils/ServerRpcMessageUtils.java | 17 +++++ .../fluss/server/zk/data/TableRegistration.java | 12 ++++ .../server/coordinator/TestCoordinatorGateway.java | 7 ++ 17 files changed, 395 insertions(+), 2 deletions(-) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index aa894d55c..a86f82045 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -29,7 +29,13 @@ import com.alibaba.fluss.client.table.writer.TableUpsert; import com.alibaba.fluss.client.table.writer.Upsert; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.metadata.UpdateProperties; +import com.alibaba.fluss.rpc.GatewayClientProxy; +import com.alibaba.fluss.rpc.gateway.AdminGateway; +import java.util.concurrent.CompletableFuture; + +import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest; import static com.alibaba.fluss.utils.Preconditions.checkState; /** @@ -86,6 +92,18 @@ public class FlussTable implements Table { return new TableUpsert(tablePath, tableInfo, conn.getOrCreateWriterClient()); } + @Override + public CompletableFuture<Void> updateProperties(UpdateProperties updateProperties) { + AdminGateway adminGateway = + GatewayClientProxy.createGatewayProxy( + () -> conn.getMetadataUpdater().getCoordinatorServer(), + conn.getRpcClient(), + AdminGateway.class); + return adminGateway + .alterTable(makeAlterTableRequest(tablePath, updateProperties)) + .thenApply(r -> null); + } + @Override public void close() throws Exception { // do nothing diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java index 573a6580d..e1505cb7a 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java @@ -26,7 +26,12 @@ import com.alibaba.fluss.client.table.writer.Append; import com.alibaba.fluss.client.table.writer.AppendWriter; import com.alibaba.fluss.client.table.writer.Upsert; import com.alibaba.fluss.client.table.writer.UpsertWriter; +import com.alibaba.fluss.exception.InvalidAlterTableException; +import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.metadata.UpdateProperties; + +import java.util.concurrent.CompletableFuture; /** * Used to communicate with a single Fluss table. Obtain an instance from a {@link Connection}. @@ -72,4 +77,18 @@ public interface Table extends AutoCloseable { * this table (requires to be a Primary Key Table). */ Upsert newUpsert(); + + /** + * Update the table properties with this table. + * + * <p>The following exceptions can be anticipated when calling {@code get()} on returned future. + * + * <ul> + * <li>{@link TableNotExistException} if the table does not exist. + * <li>{@link InvalidAlterTableException} if the update properties is invalid. + * </ul> + * + * @param updateProperties the update properties. + */ + CompletableFuture<Void> updateProperties(UpdateProperties updateProperties); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java index bfc36f6fd..76f81cf78 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java @@ -33,6 +33,8 @@ import com.alibaba.fluss.metadata.PartitionSpec; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.metadata.UpdateProperties; +import com.alibaba.fluss.rpc.messages.AlterTableRequest; import com.alibaba.fluss.rpc.messages.CreatePartitionRequest; import com.alibaba.fluss.rpc.messages.DropPartitionRequest; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse; @@ -52,6 +54,7 @@ import com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbProduceLogReqForBucket; import com.alibaba.fluss.rpc.messages.PbPutKvReqForBucket; import com.alibaba.fluss.rpc.messages.PbRemotePathAndLocalFile; +import com.alibaba.fluss.rpc.messages.PbUpdateProperties; import com.alibaba.fluss.rpc.messages.PrefixLookupRequest; import com.alibaba.fluss.rpc.messages.ProduceLogRequest; import com.alibaba.fluss.rpc.messages.PutKvRequest; @@ -344,4 +347,23 @@ public class ClientRpcMessageUtils { (key, value) -> pbKeyValues.add(new PbKeyValue().setKey(key).setValue(value))); return new PbPartitionSpec().addAllPartitionKeyValues(pbKeyValues); } + + public static AlterTableRequest makeAlterTableRequest( + TablePath tablePath, UpdateProperties updateProperties) { + AlterTableRequest request = new AlterTableRequest(); + request.setTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + PbUpdateProperties pbUpdateProperties = new PbUpdateProperties(); + pbUpdateProperties.addAllSetProperties( + updateProperties.getSetProperties().entrySet().stream() + .map( + entry -> + new PbKeyValue() + .setKey(entry.getKey()) + .setValue(entry.getValue())) + .collect(Collectors.toList())); + request.setUpdateProperties(pbUpdateProperties); + return request; + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/config/FlussConfigUtils.java index 779a0c7f4..cca591968 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/FlussConfigUtils.java @@ -21,9 +21,13 @@ import com.alibaba.fluss.annotation.Internal; import com.alibaba.fluss.annotation.VisibleForTesting; import java.lang.reflect.Field; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static com.alibaba.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; + /** Utilities of Fluss {@link ConfigOptions}. */ @Internal public class FlussConfigUtils { @@ -38,6 +42,10 @@ public class FlussConfigUtils { CLIENT_OPTIONS = extractConfigOptions("client."); } + // table properties that support alter. + public static final List<String> TABLE_OPTIONS_SUPPORT_ALTER = + Collections.singletonList(TABLE_DATALAKE_ENABLED.key()); + @VisibleForTesting static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) { Map<String, ConfigOption<?>> options = new HashMap<>(); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidAlterTableException.java b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidAlterTableException.java new file mode 100644 index 000000000..6a562b8f8 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidAlterTableException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.exception; + +/** Exception for invalid alter table. */ +public class InvalidAlterTableException extends ApiException { + public InvalidAlterTableException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/UpdateProperties.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/UpdateProperties.java new file mode 100644 index 000000000..05d1230d6 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/UpdateProperties.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metadata; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Updating properties for updating properties. */ +public class UpdateProperties { + + private final List<String> resetProperties; + private final Map<String, String> setProperties; + + private UpdateProperties(List<String> resetProperties, Map<String, String> setProperties) { + this.resetProperties = resetProperties; + this.setProperties = setProperties; + } + + /** Creates a builder for building update property data. */ + public static Builder builder() { + return new Builder(); + } + + public List<String> getResetProperties() { + return resetProperties; + } + + public Map<String, String> getSetProperties() { + return setProperties; + } + + /** Builder for {@link TableDescriptor}. */ + public static class Builder { + private final List<String> resetProperties; + private final Map<String, String> setProperties; + + protected Builder() { + this.resetProperties = new ArrayList<>(); + this.setProperties = new HashMap<>(); + } + + public Builder resetProperty(String property) { + this.resetProperties.add(property); + return this; + } + + public Builder setProperty(String property, String value) { + this.setProperties.put(property, value); + return this; + } + + public UpdateProperties build() { + return new UpdateProperties(resetProperties, setProperties); + } + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 7123c9120..76b208b69 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -27,6 +27,7 @@ import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.metadata.UpdateProperties; import com.alibaba.fluss.server.testutils.FlussClusterExtension; import com.alibaba.fluss.types.DataTypes; @@ -117,6 +118,37 @@ class LakeEnabledTableCreateITCase { return conf; } + @Test + void testDynamicEnableLake() throws Exception { + TableDescriptor logTable = + TableDescriptor.builder() + .schema(Schema.newBuilder().column("log_c1", DataTypes.INT()).build()) + .build(); + TablePath logTablePath = TablePath.of(DATABASE, "log_table"); + admin.createTable(logTablePath, logTable, false).get(); + + com.alibaba.fluss.client.table.Table table = conn.getTable(logTablePath); + table.updateProperties( + UpdateProperties.builder() + .setProperty(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .build()) + .get(); + assertThat(conn.getTable(logTablePath).getTableInfo().getTableConfig().isDataLakeEnabled()) + .isTrue(); + + Table paimonLogTable = + paimonCatalog.getTable(Identifier.create(DATABASE, logTablePath.getTableName())); + System.out.println(paimonLogTable); + + table.updateProperties( + UpdateProperties.builder() + .setProperty(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false") + .build()) + .get(); + assertThat(conn.getTable(logTablePath).getTableInfo().getTableConfig().isDataLakeEnabled()) + .isFalse(); + } + @Test void testCreateLakeEnabledTable() throws Exception { Map<String, String> customProperties = new HashMap<>(); diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java index 7012441d1..53e1f720d 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java @@ -19,6 +19,8 @@ package com.alibaba.fluss.rpc.gateway; import com.alibaba.fluss.rpc.messages.AlterConfigsRequest; import com.alibaba.fluss.rpc.messages.AlterConfigsResponse; +import com.alibaba.fluss.rpc.messages.AlterTableRequest; +import com.alibaba.fluss.rpc.messages.AlterTableResponse; import com.alibaba.fluss.rpc.messages.CreateAclsRequest; import com.alibaba.fluss.rpc.messages.CreateAclsResponse; import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest; @@ -109,6 +111,14 @@ public interface AdminGateway extends AdminReadOnlyGateway { @RPC(api = ApiKeys.ALTER_CONFIGS) CompletableFuture<AlterConfigsResponse> alterConfigs(AlterConfigsRequest request); + /** + * Alter table, like alter table properties and customProperties. + * + * @param request Alter table request + */ + @RPC(api = ApiKeys.ALTER_TABLE) + CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest request); + // todo: rename table & alter table } diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java index 1f43442c0..31481f233 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java @@ -72,7 +72,8 @@ public enum ApiKeys { DROP_ACLS(1041, 0, 0, PUBLIC), LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE), DESCRIBE_CONFIGS(1043, 0, 0, PUBLIC), - ALTER_CONFIGS(1044, 0, 0, PUBLIC); + ALTER_CONFIGS(1044, 0, 0, PUBLIC), + ALTER_TABLE(1045, 0, 0, PUBLIC); private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java index 63a46a693..5fca604bf 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java @@ -28,6 +28,7 @@ import com.alibaba.fluss.exception.DatabaseNotExistException; import com.alibaba.fluss.exception.DuplicateSequenceException; import com.alibaba.fluss.exception.FencedLeaderEpochException; import com.alibaba.fluss.exception.FencedTieringEpochException; +import com.alibaba.fluss.exception.InvalidAlterTableException; import com.alibaba.fluss.exception.InvalidColumnProjectionException; import com.alibaba.fluss.exception.InvalidConfigException; import com.alibaba.fluss.exception.InvalidCoordinatorException; @@ -219,7 +220,9 @@ public enum Errors { INVALID_REQUEST( 54, "This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible server. See the server logs for more details.", - InvalidRequestException::new); + InvalidRequestException::new), + INVALID_ALTER_TABLE_EXCEPTION( + 40, "The alter table is invalid.", InvalidAlterTableException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 1eef39f10..df44f80cd 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -561,6 +561,19 @@ message AlterConfigsResponse{ } +message AlterTableRequest { + required PbTablePath table_path = 1; + optional PbUpdateProperties update_properties = 2; +} + +message AlterTableResponse { +} + +message PbUpdateProperties { + repeated string remove_properties = 1; + repeated PbKeyValue set_properties = 2; +} + // --------------- Inner classes ---------------- message PbApiVersion { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 472c11853..622b844cc 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -45,6 +45,8 @@ import com.alibaba.fluss.rpc.messages.AdjustIsrRequest; import com.alibaba.fluss.rpc.messages.AdjustIsrResponse; import com.alibaba.fluss.rpc.messages.AlterConfigsRequest; import com.alibaba.fluss.rpc.messages.AlterConfigsResponse; +import com.alibaba.fluss.rpc.messages.AlterTableRequest; +import com.alibaba.fluss.rpc.messages.AlterTableResponse; import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest; import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse; import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest; @@ -124,6 +126,7 @@ import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilte import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.fromTablePath; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getAdjustIsrData; +import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getAlterTableData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getCommitLakeTableSnapshotData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getCommitRemoteLogManifestData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getPartitionSpec; @@ -698,6 +701,14 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return future; } + @Override + public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest request) { + AlterTableResponse response = new AlterTableResponse(); + metadataManager.alterTable( + getAlterTableData(request), lakeCatalogDynamicLoader, lakeTableTieringManager); + return CompletableFuture.completedFuture(response); + } + @VisibleForTesting public DataLakeFormat getDataLakeFormat() { return lakeCatalogDynamicLoader.getDataLakeFormat(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java index d6192fc01..921b839f0 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.exception.DatabaseAlreadyExistException; import com.alibaba.fluss.exception.DatabaseNotEmptyException; import com.alibaba.fluss.exception.DatabaseNotExistException; import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.exception.InvalidAlterTableException; import com.alibaba.fluss.exception.InvalidPartitionException; import com.alibaba.fluss.exception.PartitionAlreadyExistsException; import com.alibaba.fluss.exception.PartitionNotExistException; @@ -40,6 +41,8 @@ import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePartition; import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.metadata.UpdateProperties; +import com.alibaba.fluss.server.entity.AlterTableData; import com.alibaba.fluss.server.utils.LakeStorageUtils; import com.alibaba.fluss.server.zk.ZooKeeperClient; import com.alibaba.fluss.server.zk.data.DatabaseRegistration; @@ -55,13 +58,17 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import static com.alibaba.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; +import static com.alibaba.fluss.config.FlussConfigUtils.TABLE_OPTIONS_SUPPORT_ALTER; import static com.alibaba.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor; +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; /** A manager for metadata. */ public class MetadataManager { @@ -164,6 +171,77 @@ public class MetadataManager { return listPartitions(tablePath, null); } + public void alterTable( + AlterTableData alterTableData, + LakeCatalogDynamicLoader lakeCatalogDynamicLoader, + LakeTableTieringManager lakeTableTieringManager) { + TablePath tablePath = alterTableData.getTablePath(); + if (!tableExists(tablePath)) { + throw new TableNotExistException("Table " + tablePath + " does not exist."); + } + + TableRegistration oldTableReg = getTableRegistration(tablePath); + + // validate and apply update properties. + TableRegistration newTableReg = + validateAndApplyUpdateProperties( + oldTableReg, alterTableData.getUpdatePropertiesData()); + + Configuration newConfig = + Configuration.fromMap(alterTableData.getUpdatePropertiesData().getSetProperties()); + + boolean isEnabledDataLake = false; + if (newConfig.get(TABLE_DATALAKE_ENABLED) + && !Configuration.fromMap(oldTableReg.properties).get(TABLE_DATALAKE_ENABLED)) { + try { + TableDescriptor newTableDescriptor = + getTable(tablePath) + .toTableDescriptor() + .withProperties(newTableReg.properties); + checkNotNull(lakeCatalogDynamicLoader.getLakeCatalog()) + .createTable(tablePath, newTableDescriptor); + isEnabledDataLake = true; + } catch (TableAlreadyExistException e) { + throw new TableAlreadyExistException( + String.format( + "The table %s already exists in %s catalog, please " + + "first drop the table in %s catalog or use a new table name.", + tablePath, + lakeCatalogDynamicLoader.getDataLakeFormat(), + lakeCatalogDynamicLoader.getDataLakeFormat())); + } + } else if (!newConfig.get(TABLE_DATALAKE_ENABLED)) { + lakeTableTieringManager.removeLakeTable(oldTableReg.tableId); + } + + uncheck( + () -> zookeeperClient.updateTable(tablePath, newTableReg), + "Fail to alter table: " + tablePath); + + if (isEnabledDataLake) { + lakeTableTieringManager.addNewLakeTable(getTable(tablePath)); + } + } + + private TableRegistration validateAndApplyUpdateProperties( + TableRegistration oldTableReg, UpdateProperties updatePropertiesData) { + Map<String, String> properties = new HashMap<>(oldTableReg.properties); + // 1. set table properties. + for (Map.Entry<String, String> setProperty : + updatePropertiesData.getSetProperties().entrySet()) { + validateSetTableProperty(setProperty.getKey()); + properties.put(setProperty.getKey(), setProperty.getValue()); + } + return oldTableReg.copy(properties); + } + + private void validateSetTableProperty(String setKey) { + if (!TABLE_OPTIONS_SUPPORT_ALTER.contains(setKey)) { + throw new InvalidAlterTableException( + "Update table property: '" + setKey + "' is not supported yet."); + } + } + /** * List the partitions of the given table and partitionSpec. * diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/entity/AlterTableData.java b/fluss-server/src/main/java/com/alibaba/fluss/server/entity/AlterTableData.java new file mode 100644 index 000000000..932891ba0 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/entity/AlterTableData.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.entity; + +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.metadata.UpdateProperties; +import com.alibaba.fluss.rpc.messages.AlterTableRequest; + +/** The data for request {@link AlterTableRequest}. */ +public class AlterTableData { + private final TablePath tablePath; + private final UpdateProperties updateProperties; + // TODO add more alter table type like add column, change schema etc. + public AlterTableData(TablePath tablePath, UpdateProperties updateProperties) { + this.tablePath = tablePath; + this.updateProperties = updateProperties; + } + + public TablePath getTablePath() { + return tablePath; + } + + public UpdateProperties getUpdatePropertiesData() { + return updateProperties; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java index 034163493..fc1ce7345 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java @@ -30,6 +30,7 @@ import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.metadata.UpdateProperties; import com.alibaba.fluss.record.BytesViewLogRecords; import com.alibaba.fluss.record.DefaultKvRecordBatch; import com.alibaba.fluss.record.DefaultValueRecordBatch; @@ -49,6 +50,7 @@ import com.alibaba.fluss.rpc.entity.ProduceLogResultForBucket; import com.alibaba.fluss.rpc.entity.PutKvResultForBucket; import com.alibaba.fluss.rpc.messages.AdjustIsrRequest; import com.alibaba.fluss.rpc.messages.AdjustIsrResponse; +import com.alibaba.fluss.rpc.messages.AlterTableRequest; import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest; import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest; import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestRequest; @@ -116,6 +118,7 @@ import com.alibaba.fluss.rpc.messages.PbStopReplicaRespForBucket; import com.alibaba.fluss.rpc.messages.PbTableBucket; import com.alibaba.fluss.rpc.messages.PbTableMetadata; import com.alibaba.fluss.rpc.messages.PbTablePath; +import com.alibaba.fluss.rpc.messages.PbUpdateProperties; import com.alibaba.fluss.rpc.messages.PbValue; import com.alibaba.fluss.rpc.messages.PbValueList; import com.alibaba.fluss.rpc.messages.PrefixLookupRequest; @@ -133,6 +136,7 @@ import com.alibaba.fluss.security.acl.AclBinding; import com.alibaba.fluss.server.authorizer.AclCreateResult; import com.alibaba.fluss.server.authorizer.AclDeleteResult; import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket; +import com.alibaba.fluss.server.entity.AlterTableData; import com.alibaba.fluss.server.entity.CommitLakeTableSnapshotData; import com.alibaba.fluss.server.entity.CommitRemoteLogManifestData; import com.alibaba.fluss.server.entity.FetchReqInfo; @@ -1626,4 +1630,17 @@ public class ServerRpcMessageUtils { result.addAll(errors); return result; } + + public static AlterTableData getAlterTableData(AlterTableRequest request) { + PbTablePath pbTablePath = request.getTablePath(); + TablePath tablePath = + TablePath.of(pbTablePath.getDatabaseName(), pbTablePath.getTableName()); + PbUpdateProperties pbUpdateProperties = request.getUpdateProperties(); + UpdateProperties.Builder builder = UpdateProperties.builder(); + for (PbKeyValue setProperty : pbUpdateProperties.getSetPropertiesList()) { + builder.setProperty(setProperty.getKey(), setProperty.getValue()); + } + + return new AlterTableData(tablePath, builder.build()); + } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java index 175a56c93..d66a75501 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java @@ -132,6 +132,18 @@ public class TableRegistration { currentMillis); } + public TableRegistration copy(Map<String, String> newProperties) { + return new TableRegistration( + tableId, + comment, + partitionKeys, + new TableDistribution(bucketCount, bucketKeys), + newProperties, + customProperties, + createdTime, + System.currentTimeMillis()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java index 423d89efc..1afc23782 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java @@ -24,6 +24,8 @@ import com.alibaba.fluss.rpc.messages.AdjustIsrRequest; import com.alibaba.fluss.rpc.messages.AdjustIsrResponse; import com.alibaba.fluss.rpc.messages.AlterConfigsRequest; import com.alibaba.fluss.rpc.messages.AlterConfigsResponse; +import com.alibaba.fluss.rpc.messages.AlterTableRequest; +import com.alibaba.fluss.rpc.messages.AlterTableResponse; import com.alibaba.fluss.rpc.messages.ApiVersionsRequest; import com.alibaba.fluss.rpc.messages.ApiVersionsResponse; import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest; @@ -320,6 +322,11 @@ public class TestCoordinatorGateway implements CoordinatorGateway { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture<DescribeConfigsResponse> describeConfigs( DescribeConfigsRequest request) {
