luoyuxia commented on code in PR #1625:
URL: https://github.com/apache/fluss/pull/1625#discussion_r2370978665


##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -235,6 +236,25 @@ CompletableFuture<Void> createTable(
      */
     CompletableFuture<List<String>> listTables(String databaseName);
 
+    /**
+     * Alter a table.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on returned future.
+     *
+     * <ul>
+     *   <li>{@link DatabaseNotExistException} when the database does not 
exist.
+     *   <li>{@link TableNotExistException} when the table does not exist, if 
ignoreIfNotExists is

Review Comment:
   ```suggestion
        *   <li>{@link TableNotExistException} when the table does not exist, 
and ignoreIfNotExists is
   ```



##########
fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/** The operation type of altering table configurations. */
+public enum AlterTableConfigsOpType {
+    SET(1),

Review Comment:
   as the 
[FIP](https://cwiki.apache.org/confluence/display/FLUSS/FIP-15%3A+Alter+Table+Interface)
 said:
   ```
   // SET=0, DELETE=1, APPEND=2, SUBTRACT=3
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -235,6 +236,25 @@ CompletableFuture<Void> createTable(
      */
     CompletableFuture<List<String>> listTables(String databaseName);
 
+    /**
+     * Alter a table.

Review Comment:
   nit:
   ```suggestion
        * Alter a table with the given {@code tableChanges}.
   ```



##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -33,9 +35,15 @@ public class FlussConfigUtils {
     public static final String CLIENT_PREFIX = "client.";
     public static final String CLIENT_SECURITY_PREFIX = "client.security.";
 
+    public static final List<String> ALTERABLE_TABLE_CONFIG;
+    public static final List<String> ALTERABLE_CLIENT_OPTIONS;
+
     static {
         TABLE_OPTIONS = extractConfigOptions("table.");
         CLIENT_OPTIONS = extractConfigOptions("client.");
+        ALTERABLE_TABLE_CONFIG = Collections.emptyList();
+        ALTERABLE_CLIENT_OPTIONS =

Review Comment:
   why add the option in here. IIUC, this pr is just introduce an interface to 
allow client to alter table property, but what properties can be changed 
shouldn't happend in this pr.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java:
##########
@@ -398,11 +402,39 @@ public void createTable(ObjectPath objectPath, 
CatalogBaseTable table, boolean i
     }
 
     @Override
-    public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b)
+    public void alterTable(
+            ObjectPath objectPath,
+            CatalogBaseTable newTable,
+            List<org.apache.flink.table.catalog.TableChange> tableChanges,
+            boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
-        throw new UnsupportedOperationException();
+        TablePath tablePath = toTablePath(objectPath);
+
+        List<TableChange> flussTableChanges =
+                tableChanges.stream()
+                        .filter(Objects::nonNull)
+                        
.map(FlinkTableChangeToFlussTableChange::toFlussTableChange)
+                        .collect(Collectors.toList());
+        try {
+            admin.alterTable(tablePath, flussTableChanges, 
ignoreIfNotExists).get();
+        } catch (Exception e) {
+            Throwable t = ExceptionUtils.stripExecutionException(e);
+            if (CatalogExceptionUtils.isTableNotExist(t)) {
+                throw new TableNotExistException(getName(), objectPath);
+            } else if (isTableInvalid(t)) {
+                throw new InvalidTableException(t.getMessage());
+            } else {
+                throw new CatalogException(
+                        String.format("Failed to create table %s in %s", 
objectPath, getName()), t);
+            }
+        }
     }
 
+    @Override
+    public void alterTable(

Review Comment:
   why overwrite this method with empty body?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -302,6 +306,101 @@ public long createTable(
                 "Fail to create table " + tablePath);
     }
 
+    public void alterTableProperties(
+            TablePath tablePath,
+            List<TableChange.SetOption> setOptions,
+            List<TableChange.ResetOption> resetOptions,
+            boolean ignoreIfNotExists) {
+
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(
+                    "Database " + tablePath.getDatabaseName() + " does not 
exist.");
+        }
+        if (!tableExists(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException("Table " + tablePath + " does 
not exists.");
+            }
+        }
+
+        try {
+            TableRegistration updatedTableRegistration =
+                    getUpdatedTableRegistration(tablePath, setOptions, 
resetOptions);
+            if (updatedTableRegistration != null) {
+                zookeeperClient.updateTable(tablePath, 
updatedTableRegistration);
+            } else {
+                LOG.info(
+                        "No properties changed when alter table {}, skip 
update table.", tablePath);
+            }
+        } catch (Exception e) {
+            if (e instanceof KeeperException.NoNodeException) {
+                if (ignoreIfNotExists) {
+                    return;
+                }
+                throw new TableNotExistException("Table " + tablePath + " does 
not exists.");
+            } else {
+                throw new FlussRuntimeException("Failed to alter table: " + 
tablePath, e);
+            }
+        }
+    }
+
+    private TableRegistration getUpdatedTableRegistration(
+            TablePath tablePath,
+            List<TableChange.SetOption> setOptions,
+            List<TableChange.ResetOption> resetOptions) {
+
+        TableRegistration existTableReg = getTableRegistration(tablePath);
+
+        Map<String, String> newProperties = new 
HashMap<>(existTableReg.properties);
+        Map<String, String> newCustomProperties = new 
HashMap<>(existTableReg.customProperties);
+
+        boolean propertiesChanged = false;
+        boolean customPropertiesChanged = false;
+        for (TableChange.SetOption setOption : setOptions) {
+            String key = setOption.getKey();
+            if (ALTERABLE_TABLE_CONFIG.contains(key)) {
+                // only alterable configs can be updated, other properties 
keep unchanged.
+                String curValue = newProperties.get(key);
+                String updatedValue = setOption.getValue();
+                if (!updatedValue.equals(curValue)) {
+                    propertiesChanged = true;
+                    newProperties.put(key, updatedValue);
+                }
+            } else if (ALTERABLE_CLIENT_OPTIONS.contains(key)) {

Review Comment:
   Why should we take `ALTERABLE_CLIENT_OPTIONS` as a branch? If the property 
key is not in `TABLE_OPTIONS`, then it can be considered as custom options.



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -108,6 +108,24 @@ message CreateTableRequest {
 message CreateTableResponse {
 }
 
+// alter table request and response
+message AlterTableConfigsRequest {
+  required PbTablePath table_path = 1;
+  required bool ignore_if_not_exists = 2;
+  repeated PbAlterConfigsRequestInfo config_changes = 3;
+}
+
+message PbAlterConfigsRequestInfo {
+  required string config_key = 1;
+  optional string config_value = 2;
+  required int32 op_type = 3; // SET=0, DELETE=1, APPEND=2, SUBTRACT=3
+}
+
+message AlterTablePropertiesResponse {

Review Comment:
   ```suggestion
   message AlterTableConfigsResponse {
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -294,6 +301,54 @@ public CompletableFuture<CreateTableResponse> 
createTable(CreateTableRequest req
         return CompletableFuture.completedFuture(new CreateTableResponse());
     }
 
+    @Override
+    public CompletableFuture<AlterTablePropertiesResponse> alterTable(
+            AlterTableConfigsRequest request) {
+        TablePath tablePath = toTablePath(request.getTablePath());
+        tablePath.validate();
+        if (authorizer != null) {
+            authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.table(tablePath));
+        }
+
+        AlterTablePropertiesResponse alterTableResponse = new 
AlterTablePropertiesResponse();
+
+        handleFlussTableChanges(
+                tablePath, request.getConfigChangesList(), 
request.isIgnoreIfNotExists());
+
+        return CompletableFuture.completedFuture(alterTableResponse);
+    }
+
+    private void handleFlussTableChanges(
+            TablePath tablePath,
+            List<PbAlterConfigsRequestInfo> configsRequestInfos,
+            boolean ignoreIfNotExists) {
+
+        List<TableChange> tableChanges =
+                configsRequestInfos.stream()
+                        .filter(Objects::nonNull)
+                        .map(ServerRpcMessageUtils::toFlussTableChange)
+                        .collect(Collectors.toList());
+
+        List<TableChange.SetOption> setOptions = new ArrayList<>();
+        List<TableChange.ResetOption> resetOptions = new ArrayList<>();
+
+        for (TableChange tableChange : tableChanges) {
+            if (tableChange instanceof TableChange.SetOption) {
+                setOptions.add((TableChange.SetOption) tableChange);
+            } else if (tableChange instanceof TableChange.ResetOption) {
+                resetOptions.add((TableChange.ResetOption) tableChange);
+            }
+            // add more FlussTableChange type

Review Comment:
   I suggest define an error type in Errors:
   ```
   
   INVALID_ALTER_TABLE_EXCEPTION(
   --
   55, "The alter table is invalid.", InvalidAlterTableException::new);
   
   
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -294,6 +301,54 @@ public CompletableFuture<CreateTableResponse> 
createTable(CreateTableRequest req
         return CompletableFuture.completedFuture(new CreateTableResponse());
     }
 
+    @Override
+    public CompletableFuture<AlterTablePropertiesResponse> alterTable(
+            AlterTableConfigsRequest request) {
+        TablePath tablePath = toTablePath(request.getTablePath());
+        tablePath.validate();
+        if (authorizer != null) {
+            authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.table(tablePath));
+        }
+
+        AlterTablePropertiesResponse alterTableResponse = new 
AlterTablePropertiesResponse();
+
+        handleFlussTableChanges(
+                tablePath, request.getConfigChangesList(), 
request.isIgnoreIfNotExists());
+
+        return CompletableFuture.completedFuture(alterTableResponse);
+    }
+
+    private void handleFlussTableChanges(
+            TablePath tablePath,
+            List<PbAlterConfigsRequestInfo> configsRequestInfos,
+            boolean ignoreIfNotExists) {
+
+        List<TableChange> tableChanges =
+                configsRequestInfos.stream()
+                        .filter(Objects::nonNull)
+                        .map(ServerRpcMessageUtils::toFlussTableChange)
+                        .collect(Collectors.toList());
+
+        List<TableChange.SetOption> setOptions = new ArrayList<>();
+        List<TableChange.ResetOption> resetOptions = new ArrayList<>();
+
+        for (TableChange tableChange : tableChanges) {
+            if (tableChange instanceof TableChange.SetOption) {
+                setOptions.add((TableChange.SetOption) tableChange);
+            } else if (tableChange instanceof TableChange.ResetOption) {
+                resetOptions.add((TableChange.ResetOption) tableChange);
+            }
+            // add more FlussTableChange type

Review Comment:
   I suggest to throw exception if meets any unsupported table change instead 
of ignoing it. It'll cause the response of alter statement return suceesss, but 
the alter never happens, which is really strange.



##########
fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/** The operation type of altering table configurations. */
+public enum AlterTableConfigsOpType {
+    SET(1),
+    DELETE(2),
+    APPEND(3),
+    SUBTRACT(4);
+
+    public final int value;
+
+    AlterTableConfigsOpType(int value) {
+        this.value = value;
+    }
+
+    public static AlterTableConfigsOpType fromInt(int opType) {
+        switch (opType) {
+            case 1:
+                return SET;
+            case 2:
+                return DELETE;
+            case 3:
+                return APPEND;
+            case 4:
+                return SUBTRACT;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported AlterTableConfigsOpType: " + opType);
+        }
+    }
+
+    public int toInt() {

Review Comment:
   ```suggestion
       public int value() {
   ```
   ?



##########
fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import java.util.Objects;
+
+/** {@link TableChange} represents the modification of the Fluss Table. */
+public interface TableChange {

Review Comment:
   Add public in`PublicEvolving` and `@since` in java doc



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -302,6 +306,101 @@ public long createTable(
                 "Fail to create table " + tablePath);
     }
 
+    public void alterTableProperties(
+            TablePath tablePath,
+            List<TableChange.SetOption> setOptions,
+            List<TableChange.ResetOption> resetOptions,
+            boolean ignoreIfNotExists) {
+
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(
+                    "Database " + tablePath.getDatabaseName() + " does not 
exist.");
+        }
+        if (!tableExists(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException("Table " + tablePath + " does 
not exists.");
+            }
+        }
+
+        try {
+            TableRegistration updatedTableRegistration =
+                    getUpdatedTableRegistration(tablePath, setOptions, 
resetOptions);
+            if (updatedTableRegistration != null) {
+                zookeeperClient.updateTable(tablePath, 
updatedTableRegistration);
+            } else {
+                LOG.info(
+                        "No properties changed when alter table {}, skip 
update table.", tablePath);
+            }
+        } catch (Exception e) {
+            if (e instanceof KeeperException.NoNodeException) {
+                if (ignoreIfNotExists) {
+                    return;
+                }
+                throw new TableNotExistException("Table " + tablePath + " does 
not exists.");
+            } else {
+                throw new FlussRuntimeException("Failed to alter table: " + 
tablePath, e);
+            }
+        }
+    }
+
+    private TableRegistration getUpdatedTableRegistration(

Review Comment:
   @Nullable
   Add java doc explain when it'll then null



##########
fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/** The operation type of altering table configurations. */
+public enum AlterTableConfigsOpType {
+    SET(1),
+    DELETE(2),
+    APPEND(3),
+    SUBTRACT(4);
+
+    public final int value;
+
+    AlterTableConfigsOpType(int value) {
+        this.value = value;
+    }
+
+    public static AlterTableConfigsOpType fromInt(int opType) {

Review Comment:
   not used in this pr, can be removed?



##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -33,9 +35,15 @@ public class FlussConfigUtils {
     public static final String CLIENT_PREFIX = "client.";
     public static final String CLIENT_SECURITY_PREFIX = "client.security.";
 
+    public static final List<String> ALTERABLE_TABLE_CONFIG;
+    public static final List<String> ALTERABLE_CLIENT_OPTIONS;
+
     static {
         TABLE_OPTIONS = extractConfigOptions("table.");
         CLIENT_OPTIONS = extractConfigOptions("client.");
+        ALTERABLE_TABLE_CONFIG = Collections.emptyList();
+        ALTERABLE_CLIENT_OPTIONS =

Review Comment:
   Also, we don't store client option in table.



##########
fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java:
##########
@@ -238,6 +240,22 @@ public static ServerNode toServerNode(PbServerNode 
pbServerNode, ServerType serv
                 pbServerNode.hasRack() ? pbServerNode.getRack() : null);
     }
 
+    public static TableChange toFlussTableChange(
+            PbAlterConfigsRequestInfo pbAlterConfigsRequestInfo) {
+        switch (pbAlterConfigsRequestInfo.getOpType()) {
+            case 1: // SET_OPTION

Review Comment:
   case 0 should be SET_OPTION



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to