This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a1356dd7 [core] support alter view and alter view's dialect 
procedure (#5335)
08a1356dd7 is described below

commit 08a1356dd720bc1eb28d05ae969021d1f4d3805e
Author: jerry <[email protected]>
AuthorDate: Wed Mar 26 18:37:19 2025 +0800

    [core] support alter view and alter view's dialect procedure (#5335)
---
 docs/content/flink/procedures.md                   |  32 ++
 docs/content/spark/procedures.md                   |  21 ++
 .../java/org/apache/paimon/catalog/Catalog.java    |  71 +++++
 .../org/apache/paimon/catalog/DelegateCatalog.java |   7 +
 .../rest/ExponentialHttpRetryInterceptor.java      |  10 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  24 ++
 .../paimon/rest/requests/AlterViewRequest.java     |  49 +++
 .../paimon/rest/responses/ErrorResponse.java       |   2 +
 .../java/org/apache/paimon/view/ViewChange.java    | 354 +++++++++++++++++++++
 .../org/apache/paimon/catalog/CatalogTestBase.java |  40 +--
 .../rest/ExponentialHttpRetryInterceptorTest.java  |  44 ++-
 .../org/apache/paimon/rest/MockRESTMessage.java    |  17 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  98 +++++-
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  74 +++++
 .../apache/paimon/rest/RESTObjectMapperTest.java   |  12 +
 .../java/org/apache/paimon/flink/FlinkCatalog.java |   6 +-
 .../flink/procedure/AlterViewDialectProcedure.java | 109 +++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../org/apache/paimon/flink/RESTCatalogITCase.java |  86 +----
 .../apache/paimon/flink/RESTCatalogITCaseBase.java | 106 ++++++
 .../flink/procedure/AlterViewDialectITCase.java    |  92 ++++++
 paimon-open-api/rest-catalog-open-api.yaml         | 157 ++++++++-
 .../paimon/open/api/RESTCatalogController.java     |  29 ++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../apache/paimon/spark/catalog/SupportView.java   |   4 +-
 .../spark/procedure/AlterViewDialectProcedure.java | 152 +++++++++
 .../catalyst/analysis/PaimonViewResolver.scala     |   3 +-
 .../paimon/spark/execution/PaimonViewExec.scala    |   5 +-
 .../spark/PaimonRestCatalogSparkTestBase.scala     |  73 +++++
 .../procedure/AlterViewDialectProcedureTest.scala  |  86 +++++
 30 files changed, 1611 insertions(+), 155 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 93ec13aec0..e191108068 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -799,5 +799,37 @@ All available procedures are listed below.
          CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16, 
`partition` => 'dt=20250217,hh=08')
       </td>
    </tr>
+   <tr>
+      <td>alter_view_dialect</td>
+      <td>
+         -- add dialect in the view<br/>
+         CALL [catalog.]sys.alter_view_dialect('view_identifier', 'add', 
'flink', 'query')<br/>
+         CALL [catalog.]sys.alter_view_dialect(`view` => 'view_identifier', 
`action` => 'add', `query` => 'query')<br/><br/>
+         -- update dialect in the view<br/>
+         CALL [catalog.]sys.alter_view_dialect('view_identifier', 'update', 
'flink', 'query')<br/>
+         CALL [catalog.]sys.alter_view_dialect(`view` => 'view_identifier', 
`action` => 'update', `query` => 'query')<br/><br/>
+         -- drop dialect in the view<br/>
+         CALL [catalog.]sys.alter_view_dialect('view_identifier', 'drop', 
'flink')<br/><br/>
+         CALL [catalog.]sys.alter_view_dialect(`view` => 'view_identifier', 
`action` => 'drop')<br/><br/>
+      </td>
+      <td>
+         To alter view dialect. Arguments:
+            <li>view: the target view identifier. Cannot be empty.</li>
+            <li>action: define change action like: add, update, drop. Cannot 
be empty.</li>
+            <li>engine: when engine which is not flink need define it.</li>
+            <li>query: query for the dialect when action is add and update it 
couldn't be empty.</li>
+      </td>
+      <td>
+         -- add dialect in the view<br/>
+         CALL sys.alter_view_dialect('view_identifier', 'add', 'flink', 
'query')<br/>
+         CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'add', `query` => 'query')<br/><br/>
+         -- update dialect in the view<br/>
+         CALL sys.alter_view_dialect('view_identifier', 'update', 'flink', 
'query')<br/>
+         CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'update', `query` => 'query')<br/><br/>
+         -- drop dialect in the view<br/>
+         CALL sys.alter_view_dialect('view_identifier', 'drop', 'flink')<br/>
+         CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'drop')<br/><br/>
+      </td>
+   </tr>
    </tbody>
 </table>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index e534bcb480..fb46b6266c 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -380,5 +380,26 @@ This section introduce all available spark procedures 
about paimon.
          CALL sys.compact_manifest(`table` => 'default.T')
       </td>
    </tr>
+   <tr>
+      <td>alter_view_dialect</td>
+      <td>
+         To alter view dialect. Arguments:
+            <li>view: the target view identifier. Cannot be empty.</li>
+            <li>action: define change action like: add, update, drop. Cannot 
be empty.</li>
+            <li>engine: when engine which is not spark need define it.</li>
+            <li>query: query for the dialect when action is add and update it 
couldn't be empty.</li>
+      </td>
+      <td>
+         -- add dialect in the view<br/>
+         CALL sys.alter_view_dialect('view_identifier', 'add', 'spark', 
'query')<br/>
+         CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'add', `query` => 'query')<br/><br/>
+         -- update dialect in the view<br/>
+         CALL sys.alter_view_dialect('view_identifier', 'update', 'spark', 
'query')<br/>
+         CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'update', `query` => 'query')<br/><br/>
+         -- drop dialect in the view<br/>
+         CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark')<br/>
+         CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'drop')<br/><br/>
+      </td>
+   </tr>
    </tbody>
 </table>
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 3698f7a670..ed87935736 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -29,6 +29,7 @@ import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
 
 import javax.annotation.Nullable;
 
@@ -437,6 +438,21 @@ public interface Catalog extends AutoCloseable {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * Alter a view.
+     *
+     * @param view identifier of the view to alter
+     * @param viewChanges - changes of view
+     * @param ignoreIfNotExists
+     * @throws ViewNotExistException if the view does not exist
+     * @throws DialectAlreadyExistException if the dialect already exists
+     * @throws DialectNotExistException if the dialect not exists
+     */
+    default void alterView(Identifier view, List<ViewChange> viewChanges, 
boolean ignoreIfNotExists)
+            throws ViewNotExistException, DialectAlreadyExistException, 
DialectNotExistException {
+        throw new UnsupportedOperationException();
+    }
+
     // ======================= repair methods ===============================
 
     /**
@@ -884,6 +900,34 @@ public interface Catalog extends AutoCloseable {
         }
     }
 
+    /** Exception for trying to add a dialect that already exists. */
+    class DialectAlreadyExistException extends Exception {
+
+        private static final String MSG = "Dialect %s in view %s already 
exists.";
+
+        private final Identifier identifier;
+        private final String dialect;
+
+        public DialectAlreadyExistException(Identifier identifier, String 
dialect) {
+            this(identifier, dialect, null);
+        }
+
+        public DialectAlreadyExistException(
+                Identifier identifier, String dialect, Throwable cause) {
+            super(String.format(MSG, dialect, identifier.getFullName()), 
cause);
+            this.identifier = identifier;
+            this.dialect = dialect;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+
+        public String dialect() {
+            return dialect;
+        }
+    }
+
     /** Exception for trying to create a branch that already exists. */
     class BranchAlreadyExistException extends Exception {
 
@@ -964,4 +1008,31 @@ public interface Catalog extends AutoCloseable {
             return tag;
         }
     }
+
+    /** Exception for trying to update dialect that doesn't exist. */
+    class DialectNotExistException extends Exception {
+
+        private static final String MSG = "Dialect %s in view %s doesn't 
exist.";
+
+        private final Identifier identifier;
+        private final String dialect;
+
+        public DialectNotExistException(Identifier identifier, String dialect) 
{
+            this(identifier, dialect, null);
+        }
+
+        public DialectNotExistException(Identifier identifier, String dialect, 
Throwable cause) {
+            super(String.format(MSG, dialect, identifier.getFullName()), 
cause);
+            this.identifier = identifier;
+            this.dialect = dialect;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+
+        public String dialect() {
+            return dialect;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 756881af17..47064f8e8f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
 
 import javax.annotation.Nullable;
 
@@ -251,6 +252,12 @@ public abstract class DelegateCatalog implements Catalog {
         wrapped.renameView(fromView, toView, ignoreIfNotExists);
     }
 
+    @Override
+    public void alterView(Identifier view, List<ViewChange> viewChanges, 
boolean ignoreIfNotExists)
+            throws ViewNotExistException, DialectAlreadyExistException, 
DialectNotExistException {
+        wrapped.alterView(view, viewChanges, ignoreIfNotExists);
+    }
+
     @Override
     public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
         return wrapped.listPartitions(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
index dd16e47fc5..524080e42b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
@@ -122,19 +122,19 @@ public class ExponentialHttpRetryInterceptor implements 
Interceptor {
                 || (!response.isSuccessful() && 
retrievableCodes.contains(response.code()));
     }
 
-    public boolean needRetry(String method, IOException e, int execCount) {
+    public boolean needRetry(String method, IOException e, int execCount) 
throws IOException {
         if (execCount > maxRetries) {
-            return false;
+            throw e;
         }
         if (!retrievableMethods.contains(method)) {
-            return false;
+            throw e;
         }
         if (nonRetriableExceptions.contains(e.getClass())) {
-            return false;
+            throw e;
         } else {
             for (Class<? extends IOException> rejectException : 
nonRetriableExceptions) {
                 if (rejectException.isInstance(e)) {
-                    return false;
+                    throw e;
                 }
             }
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index be61cfd894..0b0020a461 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -44,6 +44,7 @@ import 
org.apache.paimon.rest.exceptions.NotImplementedException;
 import org.apache.paimon.rest.exceptions.ServiceFailureException;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
 import org.apache.paimon.rest.requests.CommitTableRequest;
 import org.apache.paimon.rest.requests.CreateBranchRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
@@ -80,6 +81,7 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
 import org.apache.paimon.view.ViewImpl;
 import org.apache.paimon.view.ViewSchema;
 
@@ -899,6 +901,28 @@ public class RESTCatalog implements Catalog {
         }
     }
 
+    @Override
+    public void alterView(
+            Identifier identifier, List<ViewChange> viewChanges, boolean 
ignoreIfNotExists)
+            throws ViewNotExistException, DialectAlreadyExistException, 
DialectNotExistException {
+        try {
+            AlterViewRequest request = new AlterViewRequest(viewChanges);
+            client.post(
+                    resourcePaths.view(identifier.getDatabaseName(), 
identifier.getObjectName()),
+                    request,
+                    restAuthFunction);
+        } catch (AlreadyExistsException e) {
+            throw new DialectAlreadyExistException(identifier, 
e.resourceName());
+        } catch (NoSuchResourceException e) {
+            if (StringUtils.equals(e.resourceType(), 
ErrorResponse.RESOURCE_TYPE_DIALECT)) {
+                throw new DialectNotExistException(identifier, 
e.resourceName());
+            }
+            if (!ignoreIfNotExists) {
+                throw new ViewNotExistException(identifier);
+            }
+        }
+    }
+
     @Override
     public boolean caseSensitive() {
         return context.options().getOptional(CASE_SENSITIVE).orElse(true);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/AlterViewRequest.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/AlterViewRequest.java
new file mode 100644
index 0000000000..f31e903840
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/AlterViewRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+import org.apache.paimon.view.ViewChange;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Request for altering view. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AlterViewRequest implements RESTRequest {
+
+    private static final String FIELD_CHANGES = "changes";
+
+    @JsonProperty(FIELD_CHANGES)
+    private final List<ViewChange> viewChanges;
+
+    @JsonCreator
+    public AlterViewRequest(@JsonProperty(FIELD_CHANGES) List<ViewChange> 
viewChanges) {
+        this.viewChanges = viewChanges;
+    }
+
+    @JsonGetter(FIELD_CHANGES)
+    public List<ViewChange> viewChanges() {
+        return viewChanges;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
index 04a9119d4f..b66459a5f0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
@@ -43,6 +43,8 @@ public class ErrorResponse implements RESTResponse {
 
     public static final String RESOURCE_TYPE_VIEW = "VIEW";
 
+    public static final String RESOURCE_TYPE_DIALECT = "DIALECT";
+
     private static final String FIELD_MESSAGE = "message";
     private static final String FIELD_RESOURCE_TYPE = "resourceType";
     private static final String FIELD_RESOURCE_NAME = "resourceName";
diff --git a/paimon-core/src/main/java/org/apache/paimon/view/ViewChange.java 
b/paimon-core/src/main/java/org/apache/paimon/view/ViewChange.java
new file mode 100644
index 0000000000..da15a6032e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/view/ViewChange.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.view;
+
+import org.apache.paimon.annotation.Public;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Dialect change to view. */
+@Public
+@JsonTypeInfo(
+        use = JsonTypeInfo.Id.NAME,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = ViewChange.Actions.FIELD_TYPE)
+@JsonSubTypes({
+    @JsonSubTypes.Type(
+            value = ViewChange.SetViewOption.class,
+            name = ViewChange.Actions.SET_OPTION_ACTION),
+    @JsonSubTypes.Type(
+            value = ViewChange.RemoveViewOption.class,
+            name = ViewChange.Actions.REMOVE_OPTION_ACTION),
+    @JsonSubTypes.Type(
+            value = ViewChange.UpdateViewComment.class,
+            name = ViewChange.Actions.UPDATE_COMMENT_ACTION),
+    @JsonSubTypes.Type(
+            value = ViewChange.AddDialect.class,
+            name = ViewChange.Actions.ADD_DIALECT_ACTION),
+    @JsonSubTypes.Type(
+            value = ViewChange.UpdateDialect.class,
+            name = ViewChange.Actions.UPDATE_DIALECT_ACTION),
+    @JsonSubTypes.Type(
+            value = ViewChange.DropDialect.class,
+            name = ViewChange.Actions.DROP_DIALECT_ACTION)
+})
+public interface ViewChange extends Serializable {
+
+    static ViewChange setOption(String key, String value) {
+        return new ViewChange.SetViewOption(key, value);
+    }
+
+    static ViewChange removeOption(String key) {
+        return new ViewChange.RemoveViewOption(key);
+    }
+
+    static ViewChange updateComment(String comment) {
+        return new ViewChange.UpdateViewComment(comment);
+    }
+
+    static ViewChange addDialect(String dialect, String query) {
+        return new AddDialect(dialect, query);
+    }
+
+    static ViewChange updateDialect(String dialect, String query) {
+        return new UpdateDialect(dialect, query);
+    }
+
+    static ViewChange dropDialect(String dialect) {
+        return new DropDialect(dialect);
+    }
+
+    /** set a view option for view change. */
+    final class SetViewOption implements ViewChange {
+
+        private static final long serialVersionUID = 1L;
+
+        private static final String FIELD_KEY = "key";
+        private static final String FIELD_VALUE = "value";
+
+        @JsonProperty(FIELD_KEY)
+        private final String key;
+
+        @JsonProperty(FIELD_VALUE)
+        private final String value;
+
+        @JsonCreator
+        private SetViewOption(
+                @JsonProperty(FIELD_KEY) String key, 
@JsonProperty(FIELD_VALUE) String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @JsonGetter(FIELD_KEY)
+        public String key() {
+            return key;
+        }
+
+        @JsonGetter(FIELD_VALUE)
+        public String value() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SetViewOption that = (SetViewOption) o;
+            return key.equals(that.key) && value.equals(that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
+    }
+
+    /** remove a view option for view change. */
+    final class RemoveViewOption implements ViewChange {
+
+        private static final long serialVersionUID = 1L;
+
+        private static final String FIELD_KEY = "key";
+
+        @JsonProperty(FIELD_KEY)
+        private final String key;
+
+        private RemoveViewOption(@JsonProperty(FIELD_KEY) String key) {
+            this.key = key;
+        }
+
+        @JsonGetter(FIELD_KEY)
+        public String key() {
+            return key;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            RemoveViewOption that = (RemoveViewOption) o;
+            return key.equals(that.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key);
+        }
+    }
+
+    /** update a view comment for view change. */
+    final class UpdateViewComment implements ViewChange {
+
+        private static final long serialVersionUID = 1L;
+
+        private static final String FIELD_COMMENT = "comment";
+
+        // If comment is null, means to remove comment
+        @JsonProperty(FIELD_COMMENT)
+        private final @Nullable String comment;
+
+        private UpdateViewComment(@JsonProperty(FIELD_COMMENT) @Nullable 
String comment) {
+            this.comment = comment;
+        }
+
+        @JsonGetter(FIELD_COMMENT)
+        public @Nullable String comment() {
+            return comment;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+            UpdateViewComment that = (UpdateViewComment) object;
+            return Objects.equals(comment, that.comment);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(comment);
+        }
+    }
+
+    /** addDialect dialect for view change. */
+    final class AddDialect implements ViewChange {
+        private static final long serialVersionUID = 1L;
+        private static final String FIELD_DIALECT = "dialect";
+        private static final String FIELD_QUERY = "query";
+
+        @JsonProperty(FIELD_DIALECT)
+        private final String dialect;
+
+        @JsonProperty(FIELD_QUERY)
+        private final String query;
+
+        @JsonCreator
+        public AddDialect(
+                @JsonProperty(FIELD_DIALECT) String dialect,
+                @JsonProperty(FIELD_QUERY) String query) {
+            this.dialect = dialect;
+            this.query = query;
+        }
+
+        @JsonGetter(FIELD_DIALECT)
+        public String dialect() {
+            return dialect;
+        }
+
+        @JsonGetter(FIELD_QUERY)
+        public String query() {
+            return query;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+            AddDialect that = (AddDialect) object;
+            return Objects.equals(dialect, that.dialect) && 
Objects.equals(query, that.query);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dialect, query);
+        }
+    }
+
+    /** update dialect for view change. */
+    final class UpdateDialect implements ViewChange {
+        private static final long serialVersionUID = 1L;
+        private static final String FIELD_DIALECT = "dialect";
+        private static final String FIELD_QUERY = "query";
+
+        @JsonProperty(FIELD_DIALECT)
+        private final String dialect;
+
+        @JsonProperty(FIELD_QUERY)
+        private final String query;
+
+        @JsonCreator
+        public UpdateDialect(
+                @JsonProperty(FIELD_DIALECT) String dialect,
+                @JsonProperty(FIELD_QUERY) String query) {
+            this.dialect = dialect;
+            this.query = query;
+        }
+
+        @JsonGetter(FIELD_DIALECT)
+        public String dialect() {
+            return dialect;
+        }
+
+        @JsonGetter(FIELD_QUERY)
+        public String query() {
+            return query;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+            UpdateDialect that = (UpdateDialect) object;
+            return Objects.equals(dialect, that.dialect) && 
Objects.equals(query, that.query);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dialect, query);
+        }
+    }
+
+    /** drop dialect for view change. */
+    final class DropDialect implements ViewChange {
+        private static final long serialVersionUID = 1L;
+        private static final String FIELD_DIALECT = "dialect";
+
+        @JsonProperty(FIELD_DIALECT)
+        private final String dialect;
+
+        @JsonCreator
+        public DropDialect(@JsonProperty(FIELD_DIALECT) String dialect) {
+            this.dialect = dialect;
+        }
+
+        @JsonGetter(FIELD_DIALECT)
+        public String dialect() {
+            return dialect;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+            DropDialect that = (DropDialect) object;
+            return Objects.equals(dialect, that.dialect);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dialect);
+        }
+    }
+
+    /** Actions for view alter. */
+    class Actions {
+        public static final String FIELD_TYPE = "action";
+        public static final String ADD_DIALECT_ACTION = "addDialect";
+        public static final String UPDATE_DIALECT_ACTION = "updateDialect";
+        public static final String DROP_DIALECT_ACTION = "dropDialect";
+        public static final String SET_OPTION_ACTION = "setOption";
+        public static final String REMOVE_OPTION_ACTION = "removeOption";
+        public static final String UPDATE_COMMENT_ACTION = "updateComment";
+
+        private Actions() {}
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 49d6f623c1..2c7f8812ac 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -1048,26 +1048,8 @@ public abstract class CatalogTestBase {
         if (!supportsView()) {
             return;
         }
-
         Identifier identifier = new Identifier("view_db", "my_view");
-        RowType rowType =
-                RowType.builder()
-                        .field("str", DataTypes.STRING())
-                        .field("int", DataTypes.INT())
-                        .build();
-        String query = "SELECT * FROM OTHER_TABLE";
-        String comment = "it is my view";
-        Map<String, String> options = new HashMap<>();
-        options.put("key1", "v1");
-        options.put("key2", "v2");
-
-        Map<String, String> dialects = new HashMap<>();
-        if (supportsViewDialects()) {
-            dialects.put("flink", "SELECT * FROM FLINK_TABLE");
-            dialects.put("spark", "SELECT * FROM SPARK_TABLE");
-        }
-        View view =
-                new ViewImpl(identifier, rowType.getFields(), query, dialects, 
comment, options);
+        View view = createView(identifier);
 
         assertThatThrownBy(() -> catalog.createView(identifier, view, false))
                 .isInstanceOf(Catalog.DatabaseNotExistException.class);
@@ -1568,4 +1550,24 @@ public abstract class CatalogTestBase {
         options.put("type", "format-table");
         return options;
     }
+
+    protected View createView(Identifier identifier) {
+        RowType rowType =
+                RowType.builder()
+                        .field("str", DataTypes.STRING())
+                        .field("int", DataTypes.INT())
+                        .build();
+        String query = "SELECT * FROM OTHER_TABLE";
+        String comment = "it is my view";
+        Map<String, String> options = new HashMap<>();
+        options.put("key1", "v1");
+        options.put("key2", "v2");
+
+        Map<String, String> dialects = new HashMap<>();
+        if (supportsViewDialects()) {
+            dialects.put("flink", "SELECT * FROM FLINK_TABLE");
+            dialects.put("spark", "SELECT * FROM SPARK_TABLE");
+        }
+        return new ViewImpl(identifier, rowType.getFields(), query, dialects, 
comment, options);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
index 6510371f2d..02e285b7e7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
@@ -33,6 +33,7 @@ import java.net.NoRouteToHostException;
 import java.net.UnknownHostException;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link ExponentialHttpRetryInterceptor}. */
 class ExponentialHttpRetryInterceptorTest {
@@ -42,7 +43,7 @@ class ExponentialHttpRetryInterceptorTest {
             new ExponentialHttpRetryInterceptor(maxRetries);
 
     @Test
-    void testNeedRetryByMethod() {
+    void testNeedRetryByMethod() throws IOException {
 
         assertThat(interceptor.needRetry("GET", new IOException(), 
1)).isTrue();
         assertThat(interceptor.needRetry("HEAD", new IOException(), 
1)).isTrue();
@@ -50,25 +51,38 @@ class ExponentialHttpRetryInterceptorTest {
         assertThat(interceptor.needRetry("DELETE", new IOException(), 
1)).isTrue();
         assertThat(interceptor.needRetry("TRACE", new IOException(), 
1)).isTrue();
         assertThat(interceptor.needRetry("OPTIONS", new IOException(), 
1)).isTrue();
-
-        assertThat(interceptor.needRetry("POST", new IOException(), 
1)).isFalse();
-        assertThat(interceptor.needRetry("PATCH", new IOException(), 
1)).isFalse();
-        assertThat(interceptor.needRetry("CONNECT", new IOException(), 
1)).isFalse();
-        assertThat(interceptor.needRetry("GET", new IOException(), maxRetries 
+ 1)).isFalse();
+        assertThatThrownBy(() -> interceptor.needRetry("POST", new 
IOException(), 1))
+                .isInstanceOf(IOException.class);
+        assertThatThrownBy(() -> interceptor.needRetry("POST", new 
IOException(), 1))
+                .isInstanceOf(IOException.class);
+        assertThatThrownBy(() -> interceptor.needRetry("PATCH", new 
IOException(), 1))
+                .isInstanceOf(IOException.class);
+        assertThatThrownBy(() -> interceptor.needRetry("CONNECT", new 
IOException(), 1))
+                .isInstanceOf(IOException.class);
+        assertThatThrownBy(() -> interceptor.needRetry("GET", new 
IOException(), maxRetries + 1))
+                .isInstanceOf(IOException.class);
     }
 
     @Test
-    void testNeedRetryByException() {
-
-        assertThat(interceptor.needRetry("GET", new InterruptedIOException(), 
1)).isFalse();
-        assertThat(interceptor.needRetry("GET", new UnknownHostException(), 
1)).isFalse();
-        assertThat(interceptor.needRetry("GET", new ConnectException(), 
1)).isFalse();
-        assertThat(interceptor.needRetry("GET", new NoRouteToHostException(), 
1)).isFalse();
-        assertThat(interceptor.needRetry("GET", new SSLException("error"), 
1)).isFalse();
+    void testNeedRetryByException() throws IOException {
+
+        assertThatThrownBy(() -> interceptor.needRetry("GET", new 
InterruptedIOException(), 1))
+                .isInstanceOf(InterruptedIOException.class);
+        assertThatThrownBy(() -> interceptor.needRetry("GET", new 
UnknownHostException(), 1))
+                .isInstanceOf(UnknownHostException.class);
+        assertThatThrownBy(() -> interceptor.needRetry("GET", new 
ConnectException(), 1))
+                .isInstanceOf(ConnectException.class);
+        assertThatThrownBy(() -> interceptor.needRetry("GET", new 
NoRouteToHostException(), 1))
+                .isInstanceOf(NoRouteToHostException.class);
+        assertThatThrownBy(() -> interceptor.needRetry("GET", new 
SSLException("error"), 1))
+                .isInstanceOf(SSLException.class);
 
         assertThat(interceptor.needRetry("GET", new IOException("error"), 
1)).isTrue();
-        assertThat(interceptor.needRetry("GET", new IOException("error"), 
maxRetries + 1))
-                .isFalse();
+        assertThatThrownBy(
+                        () ->
+                                interceptor.needRetry(
+                                        "GET", new IOException("error"), 
maxRetries + 1))
+                .isInstanceOf(IOException.class);
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index 0d5d46c550..b5155c58f1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
 import org.apache.paimon.rest.requests.CreateViewRequest;
@@ -44,6 +45,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.view.ViewChange;
 import org.apache.paimon.view.ViewSchema;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -271,6 +273,17 @@ public class MockRESTMessage {
         return new RollbackTableRequest(Instant.tag(tagName));
     }
 
+    public static AlterViewRequest alterViewRequest() {
+        List<ViewChange> viewChanges = new ArrayList<>();
+        viewChanges.add(ViewChange.setOption("key", "value"));
+        viewChanges.add(ViewChange.removeOption("key"));
+        viewChanges.add(ViewChange.updateComment("comment"));
+        viewChanges.add(ViewChange.addDialect("dialect", "query"));
+        viewChanges.add(ViewChange.updateDialect("dialect", "query"));
+        viewChanges.add(ViewChange.dropDialect("dialect"));
+        return new AlterViewRequest(viewChanges);
+    }
+
     private static ViewSchema viewSchema() {
         List<DataField> fields =
                 Arrays.asList(
@@ -284,10 +297,6 @@ public class MockRESTMessage {
                 Collections.singletonMap("pt", "1"));
     }
 
-    private static Partition partition() {
-        return new Partition(Collections.singletonMap("pt", "1"), 1, 1, 1, 1, 
false);
-    }
-
     private static Schema schema(Map<String, String> options) {
         List<DataField> fields =
                 Arrays.asList(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index fd1497d03d..129448f861 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -42,6 +42,7 @@ import org.apache.paimon.rest.auth.AuthProvider;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
 import org.apache.paimon.rest.requests.CommitTableRequest;
 import org.apache.paimon.rest.requests.CreateBranchRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
@@ -77,6 +78,7 @@ import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
 import org.apache.paimon.view.ViewImpl;
 import org.apache.paimon.view.ViewSchema;
 
@@ -352,10 +354,7 @@ public class RESTCatalogServer {
                             return new MockResponse().setResponseCode(200);
                         } else if (isPartitions) {
                             return partitionsApiHandle(
-                                    restAuthParameter.method(),
-                                    restAuthParameter.data(),
-                                    parameters,
-                                    identifier);
+                                    restAuthParameter.method(), parameters, 
identifier);
                         } else if (isBranches) {
                             return branchApiHandle(
                                     resources,
@@ -411,7 +410,10 @@ public class RESTCatalogServer {
                             return viewDetailsHandle(
                                     restAuthParameter.method(), databaseName, 
parameters);
                         } else if (isView) {
-                            return viewHandle(restAuthParameter.method(), 
identifier);
+                            return viewHandle(
+                                    restAuthParameter.method(),
+                                    identifier,
+                                    restAuthParameter.data());
                         } else {
                             return databaseHandle(
                                     restAuthParameter.method(),
@@ -492,6 +494,14 @@ public class RESTCatalogServer {
                                     e.getMessage(),
                                     404);
                     return mockResponse(response, 404);
+                } catch (Catalog.DialectNotExistException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponse.RESOURCE_TYPE_DIALECT,
+                                    e.dialect(),
+                                    e.getMessage(),
+                                    404);
+                    return mockResponse(response, 404);
                 } catch (Catalog.ViewAlreadyExistException e) {
                     response =
                             new ErrorResponse(
@@ -500,6 +510,14 @@ public class RESTCatalogServer {
                                     e.getMessage(),
                                     409);
                     return mockResponse(response, 409);
+                } catch (Catalog.DialectAlreadyExistException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponse.RESOURCE_TYPE_DIALECT,
+                                    e.dialect(),
+                                    e.getMessage(),
+                                    409);
+                    return mockResponse(response, 409);
                 } catch (IllegalArgumentException e) {
                     response = new ErrorResponse(null, null, e.getMessage(), 
400);
                     return mockResponse(response, 400);
@@ -1026,7 +1044,7 @@ public class RESTCatalogServer {
     }
 
     private MockResponse partitionsApiHandle(
-            String method, String data, Map<String, String> parameters, 
Identifier tableIdentifier)
+            String method, Map<String, String> parameters, Identifier 
tableIdentifier)
             throws Exception {
         switch (method) {
             case "GET":
@@ -1301,7 +1319,8 @@ public class RESTCatalogServer {
                 .collect(Collectors.toList());
     }
 
-    private MockResponse viewHandle(String method, Identifier identifier) 
throws Exception {
+    private MockResponse viewHandle(String method, Identifier identifier, 
String requestData)
+            throws Exception {
         RESTResponse response;
         if (viewStore.containsKey(identifier.getFullName())) {
             switch (method) {
@@ -1331,6 +1350,71 @@ public class RESTCatalogServer {
                 case "DELETE":
                     viewStore.remove(identifier.getFullName());
                     return new MockResponse().setResponseCode(200);
+                case "POST":
+                    if (viewStore.containsKey(identifier.getFullName())) {
+                        AlterViewRequest request =
+                                OBJECT_MAPPER.readValue(requestData, 
AlterViewRequest.class);
+                        ViewImpl view = (ViewImpl) 
viewStore.get(identifier.getFullName());
+                        HashMap<String, String> newDialects = new 
HashMap<>(view.dialects());
+                        Map<String, String> newOptions = new 
HashMap<>(view.options());
+                        String newComment = view.comment().orElse(null);
+                        for (ViewChange viewChange : request.viewChanges()) {
+                            if (viewChange instanceof 
ViewChange.SetViewOption) {
+                                ViewChange.SetViewOption setViewOption =
+                                        (ViewChange.SetViewOption) viewChange;
+                                newOptions.put(setViewOption.key(), 
setViewOption.value());
+
+                            } else if (viewChange instanceof 
ViewChange.RemoveViewOption) {
+                                ViewChange.RemoveViewOption removeViewOption =
+                                        (ViewChange.RemoveViewOption) 
viewChange;
+                                newOptions.remove(removeViewOption.key());
+                            } else if (viewChange instanceof 
ViewChange.UpdateViewComment) {
+                                ViewChange.UpdateViewComment updateViewComment 
=
+                                        (ViewChange.UpdateViewComment) 
viewChange;
+                                newComment = updateViewComment.comment();
+                            } else if (viewChange instanceof 
ViewChange.AddDialect) {
+                                ViewChange.AddDialect addDialect =
+                                        (ViewChange.AddDialect) viewChange;
+                                if 
(view.dialects().containsKey(addDialect.dialect())) {
+
+                                    throw new 
Catalog.DialectAlreadyExistException(
+                                            identifier, addDialect.dialect());
+                                } else {
+                                    newDialects.put(addDialect.dialect(), 
addDialect.query());
+                                }
+                            } else if (viewChange instanceof 
ViewChange.UpdateDialect) {
+                                ViewChange.UpdateDialect updateDialect =
+                                        (ViewChange.UpdateDialect) viewChange;
+                                if 
(view.dialects().containsKey(updateDialect.dialect())) {
+                                    newDialects.put(updateDialect.dialect(), 
updateDialect.query());
+                                } else {
+                                    throw new Catalog.DialectNotExistException(
+                                            identifier, 
updateDialect.dialect());
+                                }
+                            } else if (viewChange instanceof 
ViewChange.DropDialect) {
+                                ViewChange.DropDialect dropDialect =
+                                        (ViewChange.DropDialect) viewChange;
+                                if 
(view.dialects().containsKey(dropDialect.dialect())) {
+                                    newDialects.remove(dropDialect.dialect());
+                                } else {
+                                    throw new Catalog.DialectNotExistException(
+                                            identifier, dropDialect.dialect());
+                                }
+                            }
+                        }
+                        view =
+                                new ViewImpl(
+                                        identifier,
+                                        view.rowType().getFields(),
+                                        view.query(),
+                                        newDialects,
+                                        newComment,
+                                        newOptions);
+                        viewStore.put(identifier.getFullName(), view);
+                        return new MockResponse().setResponseCode(200);
+                    } else {
+                        throw new Catalog.ViewNotExistException(identifier);
+                    }
                 default:
                     return new MockResponse().setResponseCode(404);
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index f7d734e05c..920400c53a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -50,6 +50,7 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -82,6 +83,7 @@ import static 
org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
 import static 
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -1053,6 +1055,78 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         assertThat(fetchData).containsSequence(testData);
     }
 
+    @Test
+    void testAlterView() throws Exception {
+        Identifier identifier = new Identifier("rest_catalog_db", "my_view");
+        View view = createView(identifier);
+        catalog.createDatabase(identifier.getDatabaseName(), false);
+        ViewChange.AddDialect addDialect =
+                (ViewChange.AddDialect)
+                        ViewChange.addDialect("flink_1", "SELECT * FROM 
FLINK_TABLE_1");
+        assertDoesNotThrow(() -> catalog.alterView(identifier, 
ImmutableList.of(addDialect), true));
+        assertThrows(
+                Catalog.ViewNotExistException.class,
+                () -> catalog.alterView(identifier, 
ImmutableList.of(addDialect), false));
+        catalog.createView(identifier, view, false);
+        // set options
+        String key = UUID.randomUUID().toString();
+        String value = UUID.randomUUID().toString();
+        ViewChange setOption = ViewChange.setOption(key, value);
+        catalog.alterView(identifier, ImmutableList.of(setOption), false);
+        View catalogView = catalog.getView(identifier);
+        assertThat(catalogView.options().get(key)).isEqualTo(value);
+
+        // remove options
+        catalog.alterView(identifier, 
ImmutableList.of(ViewChange.removeOption(key)), false);
+        catalogView = catalog.getView(identifier);
+        assertThat(catalogView.options().containsKey(key)).isEqualTo(false);
+
+        // update comment
+        String newComment = "new comment";
+        catalog.alterView(
+                identifier, 
ImmutableList.of(ViewChange.updateComment(newComment)), false);
+        catalogView = catalog.getView(identifier);
+        assertThat(catalogView.comment().get()).isEqualTo(newComment);
+        // add dialect
+        catalog.alterView(identifier, ImmutableList.of(addDialect), false);
+        catalogView = catalog.getView(identifier);
+        
assertThat(catalogView.query(addDialect.dialect())).isEqualTo(addDialect.query());
+        assertThrows(
+                Catalog.DialectAlreadyExistException.class,
+                () -> catalog.alterView(identifier, 
ImmutableList.of(addDialect), false));
+
+        // update dialect
+        ViewChange.UpdateDialect updateDialect =
+                (ViewChange.UpdateDialect)
+                        ViewChange.updateDialect("flink_1", "SELECT * FROM 
FLINK_TABLE_2");
+        catalog.alterView(identifier, ImmutableList.of(updateDialect), false);
+        catalogView = catalog.getView(identifier);
+        
assertThat(catalogView.query(updateDialect.dialect())).isEqualTo(updateDialect.query());
+        assertThrows(
+                Catalog.DialectNotExistException.class,
+                () ->
+                        catalog.alterView(
+                                identifier,
+                                ImmutableList.of(
+                                        ViewChange.updateDialect(
+                                                "no_exist", "SELECT * FROM 
FLINK_TABLE_2")),
+                                false));
+
+        // drop dialect
+        ViewChange.DropDialect dropDialect =
+                (ViewChange.DropDialect) 
ViewChange.dropDialect(updateDialect.dialect());
+        catalog.alterView(identifier, ImmutableList.of(dropDialect), false);
+        catalogView = catalog.getView(identifier);
+        
assertThat(catalogView.query(dropDialect.dialect())).isEqualTo(catalogView.query());
+        assertThrows(
+                Catalog.DialectNotExistException.class,
+                () ->
+                        catalog.alterView(
+                                identifier,
+                                
ImmutableList.of(ViewChange.dropDialect("no_exist")),
+                                false));
+    }
+
     private TestPagedResponse generateTestPagedResponse(
             Map<String, String> queryParams,
             List<Integer> testData,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
index 974837c69a..c395d2e303 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest;
 
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
 import org.apache.paimon.rest.requests.CreateViewRequest;
@@ -268,4 +269,15 @@ public class RESTObjectMapperTest {
                                 .getInstant();
         assertEquals(rollbackTableRequestByTagParseData.getTagName(), tagName);
     }
+
+    @Test
+    public void alterViewRequestParseTest() throws Exception {
+        AlterViewRequest request = MockRESTMessage.alterViewRequest();
+        String requestStr = OBJECT_MAPPER.writeValueAsString(request);
+        AlterViewRequest parseData = OBJECT_MAPPER.readValue(requestStr, 
AlterViewRequest.class);
+        assertEquals(parseData.viewChanges().size(), 
request.viewChanges().size());
+        for (int i = 0; i < request.viewChanges().size(); i++) {
+            assertEquals(parseData.viewChanges().get(i), 
request.viewChanges().get(i));
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index f45b6c9240..ed4f3b8242 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -167,6 +167,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 /** Catalog for paimon. */
 public class FlinkCatalog extends AbstractCatalog {
 
+    public static final String DIALECT = "flink";
+
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalog.class);
 
     private final ClassLoader classLoader;
@@ -346,7 +348,7 @@ public class FlinkCatalog extends AbstractCatalog {
                 org.apache.flink.table.api.Schema.newBuilder()
                         
.fromRowDataType(fromLogicalToDataType(toLogicalType(view.rowType())))
                         .build();
-        String query = view.query("flink");
+        String query = view.query(DIALECT);
         return Optional.of(
                 CatalogView.of(schema, view.comment().orElse(null), query, 
query, view.options()));
     }
@@ -456,7 +458,7 @@ public class FlinkCatalog extends AbstractCatalog {
                         identifier,
                         builder.build().getFields(),
                         query,
-                        Collections.singletonMap("flink", query),
+                        Collections.singletonMap(DIALECT, query),
                         table.getComment(),
                         table.getOptions());
         try {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java
new file mode 100644
index 0000000000..cd2a0172ef
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.view.ViewChange;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import static org.apache.paimon.flink.FlinkCatalog.DIALECT;
+
+/**
+ * alter view procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  -- add dialect in the view
+ *  CALL sys.alter_view_dialect('view_identifier', 'add', 'flink', 'query')
+ *  CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'add', `query` => 'query')
+ *
+ *  -- update dialect in the view
+ *  CALL sys.alter_view_dialect('view_identifier', 'update', 'flink', 'query')
+ *  CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'update', `query` => 'query')
+ *
+ *  -- drop dialect in the view
+ *  CALL sys.alter_view_dialect('view_identifier', 'drop', 'flink')
+ *  CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'drop')
+ *
+ * </code></pre>
+ */
+public class AlterViewDialectProcedure extends ProcedureBase {
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "view", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "action", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "engine", type = @DataTypeHint("STRING"), 
isOptional = true),
+                @ArgumentHint(name = "query", type = @DataTypeHint("STRING"), 
isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String view,
+            String action,
+            String engine,
+            String query)
+            throws Catalog.ViewNotExistException, 
Catalog.DialectAlreadyExistException,
+                    Catalog.DialectNotExistException {
+        Identifier identifier = Identifier.fromString(view);
+        ViewChange viewChange;
+        String dialect = StringUtils.isNullOrWhitespaceOnly(engine) ? DIALECT 
: engine;
+        switch (action) {
+            case "add":
+                {
+                    if (StringUtils.isNullOrWhitespaceOnly(query)) {
+                        throw new IllegalArgumentException("query is required 
for add action.");
+                    }
+                    viewChange = ViewChange.addDialect(dialect, query);
+                    break;
+                }
+            case "update":
+                {
+                    if (StringUtils.isNullOrWhitespaceOnly(query)) {
+                        throw new IllegalArgumentException("query is required 
for update action.");
+                    }
+                    viewChange = ViewChange.updateDialect(dialect, query);
+                    break;
+                }
+            case "drop":
+                {
+                    viewChange = ViewChange.dropDialect(dialect);
+                    break;
+                }
+            default:
+                {
+                    throw new IllegalArgumentException("Unsupported action: " 
+ action);
+                }
+        }
+        catalog.alterView(identifier, ImmutableList.of(viewChange), false);
+        return new String[] {"Success"};
+    }
+
+    @Override
+    public String identifier() {
+        return "alter_view_dialect";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 184706854c..c6ae6b2595 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -89,3 +89,4 @@ 
org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
 org.apache.paimon.flink.procedure.ClearConsumersProcedure
 org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure
 org.apache.paimon.flink.procedure.RescaleProcedure
+org.apache.paimon.flink.procedure.AlterViewDialectProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 36efa4817c..3ec704a10c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -19,79 +19,21 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.rest.RESTCatalogInternalOptions;
-import org.apache.paimon.rest.RESTCatalogOptions;
-import org.apache.paimon.rest.RESTCatalogServer;
-import org.apache.paimon.rest.RESTFileIOTestLoader;
-import org.apache.paimon.rest.RESTTestFileIO;
 import org.apache.paimon.rest.RESTToken;
-import org.apache.paimon.rest.RESTTokenFileIO;
-import org.apache.paimon.rest.auth.AuthProvider;
-import org.apache.paimon.rest.auth.AuthProviderEnum;
-import org.apache.paimon.rest.auth.BearTokenAuthProvider;
-import org.apache.paimon.rest.responses.ConfigResponse;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.apache.flink.types.Row;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** ITCase for REST catalog. */
-class RESTCatalogITCase extends CatalogITCaseBase {
-
-    private static final String DATABASE_NAME = "mydb";
-    private static final String TABLE_NAME = "t1";
-
-    private RESTCatalogServer restCatalogServer;
-    private String serverUrl;
-    private String dataPath;
-    private String warehouse;
-    @TempDir java.nio.file.Path tempFile;
-
-    @BeforeEach
-    @Override
-    public void before() throws IOException {
-        String initToken = "init_token";
-        dataPath = tempFile.toUri().toString();
-        warehouse = UUID.randomUUID().toString();
-        ConfigResponse config =
-                new ConfigResponse(
-                        ImmutableMap.of(
-                                RESTCatalogInternalOptions.PREFIX.key(),
-                                "paimon",
-                                RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
-                                "true",
-                                CatalogOptions.WAREHOUSE.key(),
-                                warehouse),
-                        ImmutableMap.of());
-        AuthProvider authProvider = new BearTokenAuthProvider(initToken);
-        restCatalogServer = new RESTCatalogServer(dataPath, authProvider, 
config, warehouse);
-        restCatalogServer.start();
-        serverUrl = restCatalogServer.getUrl();
-        super.before();
-        sql(String.format("CREATE DATABASE %s", DATABASE_NAME));
-        sql(String.format("CREATE TABLE %s.%s (a STRING, b DOUBLE)", 
DATABASE_NAME, TABLE_NAME));
-    }
-
-    @AfterEach()
-    public void after() throws IOException {
-        sql(String.format("DROP TABLE  %s.%s", DATABASE_NAME, TABLE_NAME));
-        sql(String.format("DROP DATABASE %s", DATABASE_NAME));
-        restCatalogServer.shutdown();
-    }
+class RESTCatalogITCase extends RESTCatalogITCaseBase {
 
     @Test
     void testCreateTable() {
@@ -160,30 +102,4 @@ class RESTCatalogITCase extends CatalogITCaseBase {
         assertThat(batchSql(String.format("SELECT * FROM %s.%s", 
DATABASE_NAME, TABLE_NAME)))
                 .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 
22.0D));
     }
-
-    @Override
-    protected Map<String, String> catalogOptions() {
-        String initToken = "init_token";
-        Map<String, String> options = new HashMap<>();
-        options.put("metastore", "rest");
-        options.put(CatalogOptions.WAREHOUSE.key(), warehouse);
-        options.put(RESTCatalogOptions.URI.key(), serverUrl);
-        options.put(RESTCatalogOptions.TOKEN.key(), initToken);
-        options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.BEAR.identifier());
-        options.put(RESTTokenFileIO.DATA_TOKEN_ENABLED.key(), "true");
-        options.put(
-                RESTTestFileIO.DATA_PATH_CONF_KEY,
-                dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
-        return options;
-    }
-
-    @Override
-    protected String getTempDirPath() {
-        return this.dataPath;
-    }
-
-    @Override
-    protected boolean supportDefineWarehouse() {
-        return false;
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
new file mode 100644
index 0000000000..5c214568ee
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.RESTFileIOTestLoader;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.RESTTokenFileIO;
+import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/** Base ITCase for REST catalog. */
+public abstract class RESTCatalogITCaseBase extends CatalogITCaseBase {
+
+    protected static final String DATABASE_NAME = "mydb";
+    protected static final String TABLE_NAME = "t1";
+
+    protected RESTCatalogServer restCatalogServer;
+
+    private String serverUrl;
+    private String dataPath;
+    private String warehouse;
+    @TempDir java.nio.file.Path tempFile;
+
+    @BeforeEach
+    @Override
+    public void before() throws IOException {
+        String initToken = "init_token";
+        dataPath = tempFile.toUri().toString();
+        warehouse = UUID.randomUUID().toString();
+        ConfigResponse config =
+                new ConfigResponse(
+                        ImmutableMap.of(
+                                RESTCatalogInternalOptions.PREFIX.key(),
+                                "paimon",
+                                RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
+                                "true",
+                                CatalogOptions.WAREHOUSE.key(),
+                                warehouse),
+                        ImmutableMap.of());
+        AuthProvider authProvider = new BearTokenAuthProvider(initToken);
+        restCatalogServer = new RESTCatalogServer(dataPath, authProvider, 
config, warehouse);
+        restCatalogServer.start();
+        serverUrl = restCatalogServer.getUrl();
+        super.before();
+        sql(String.format("CREATE DATABASE %s", DATABASE_NAME));
+        sql(String.format("CREATE TABLE %s.%s (a STRING, b DOUBLE)", 
DATABASE_NAME, TABLE_NAME));
+    }
+
+    @Override
+    protected Map<String, String> catalogOptions() {
+        String initToken = "init_token";
+        Map<String, String> options = new HashMap<>();
+        options.put("metastore", "rest");
+        options.put(CatalogOptions.WAREHOUSE.key(), warehouse);
+        options.put(RESTCatalogOptions.URI.key(), serverUrl);
+        options.put(RESTCatalogOptions.TOKEN.key(), initToken);
+        options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.BEAR.identifier());
+        options.put(RESTTokenFileIO.DATA_TOKEN_ENABLED.key(), "true");
+        options.put(
+                RESTTestFileIO.DATA_PATH_CONF_KEY,
+                dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
+        return options;
+    }
+
+    @Override
+    protected String getTempDirPath() {
+        return this.dataPath;
+    }
+
+    @Override
+    protected boolean supportDefineWarehouse() {
+        return false;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/AlterViewDialectITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/AlterViewDialectITCase.java
new file mode 100644
index 0000000000..dcefde8d9b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/AlterViewDialectITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.FlinkCatalog;
+import org.apache.paimon.flink.RESTCatalogITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link AlterViewDialectProcedure}. */
+public class AlterViewDialectITCase extends RESTCatalogITCaseBase {
+
+    @Test
+    public void testAlterViewDialect() {
+
+        sql(
+                String.format(
+                        "INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
+                        DATABASE_NAME, TABLE_NAME));
+        String viewName = "view_test";
+        String query =
+                String.format("SELECT * FROM `%s`.`%s` WHERE `b` > 1", 
DATABASE_NAME, TABLE_NAME);
+        sql(String.format("CREATE VIEW %s.%s AS %s", DATABASE_NAME, viewName, 
query));
+        String newQuery =
+                String.format("SELECT * FROM `%s`.`%s` WHERE `b` > 2", 
DATABASE_NAME, TABLE_NAME);
+
+        List<Row> result =
+                sql(
+                        String.format(
+                                "CALL sys.alter_view_dialect('%s.%s', 
'update', '%s', '%s')",
+                                DATABASE_NAME, viewName, FlinkCatalog.DIALECT, 
newQuery));
+        assertThat(result.toString()).contains("Success");
+        result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME, 
viewName));
+        assertThat(result.toString()).contains(newQuery);
+
+        result =
+                sql(
+                        String.format(
+                                "CALL sys.alter_view_dialect('%s.%s', 'drop', 
'%s')",
+                                DATABASE_NAME, viewName, 
FlinkCatalog.DIALECT));
+        assertThat(result.toString()).contains("Success");
+
+        result =
+                sql(
+                        String.format(
+                                "CALL sys.alter_view_dialect('%s.%s', 'add', 
'%s', '%s')",
+                                DATABASE_NAME, viewName, FlinkCatalog.DIALECT, 
query));
+        assertThat(result.toString()).contains("Success");
+        result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME, 
viewName));
+        assertThat(result.toString()).contains(query);
+
+        sql(
+                String.format(
+                        "CALL sys.alter_view_dialect(`view` => '%s.%s', 
`action` => 'update', `query` => '%s')",
+                        DATABASE_NAME, viewName, newQuery));
+        result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME, 
viewName));
+        assertThat(result.toString()).contains(newQuery);
+        sql(
+                String.format(
+                        "CALL sys.alter_view_dialect(`view` => '%s.%s', 
`action` => 'drop')",
+                        DATABASE_NAME, viewName));
+        result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME, 
viewName));
+        assertThat(result.toString()).contains("`b` > 1");
+        sql(
+                String.format(
+                        "CALL sys.alter_view_dialect(`view` => '%s.%s', 
`action` => 'add', `query` => '%s')",
+                        DATABASE_NAME, viewName, query));
+        result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME, 
viewName));
+        assertThat(result.toString()).contains(query);
+    }
+}
diff --git a/paimon-open-api/rest-catalog-open-api.yaml 
b/paimon-open-api/rest-catalog-open-api.yaml
index 44b1a63385..26b9f1234a 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -1309,6 +1309,53 @@ paths:
                 $ref: '#/components/schemas/ErrorResponse'
         "500":
           description: Internal Server Error
+    post:
+      tags:
+        - view
+      summary: Alter view
+      operationId: alterView
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: view
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/AlterViewRequest'
+      responses:
+        "200":
+          description: Success, no content
+        "401":
+          description: Unauthorized
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
     delete:
       tags:
         - view
@@ -1521,7 +1568,7 @@ components:
           type: string
         resourceType:
           type: string
-          enum: ["DATABASE", "TABLE", "COLUMN", "SNAPSHOT", "BRANCH", "TAG", 
"VIEW", "UNKNOWN"]
+          enum: ["DATABASE", "TABLE", "COLUMN", "SNAPSHOT", "BRANCH", "TAG", 
"VIEW", "DIALECT", "UNKNOWN"]
         resourceName:
           type: string
         code:
@@ -1541,6 +1588,80 @@ components:
           $ref: '#/components/schemas/Identifier'
         schema:
           $ref: '#/components/schemas/ViewSchema'
+    AlterViewRequest:
+      type: object
+      properties:
+        changes:
+          type: array
+          items:
+            $ref: '#/components/schemas/ViewChange'
+    ViewChange:
+      anyOf:
+        - $ref: '#/components/schemas/SetViewOption'
+        - $ref: '#/components/schemas/RemoveViewOption'
+        - $ref: '#/components/schemas/UpdateViewComment'
+        - $ref: '#/components/schemas/AddDialect'
+        - $ref: '#/components/schemas/UpdateDialect'
+        - $ref: '#/components/schemas/DropDialect'
+      required:
+        - action
+      properties:
+        action:
+          type: string
+    SetViewOption:
+      type: object
+      properties:
+        action:
+          type: string
+          const: "setOption"
+        key:
+          type: string
+        value:
+          type: string
+    RemoveViewOption:
+      type: object
+      properties:
+        action:
+          type: string
+          const: "removeOption"
+        key:
+          type: string
+    UpdateViewComment:
+      type: object
+      properties:
+        action:
+          type: string
+          const: "comment"
+        key:
+          type: string
+    AddDialect:
+      type: object
+      properties:
+        action:
+          type: string
+          const: "addDialect"
+        dialect:
+          type: string
+        query:
+          type: string
+    UpdateDialect:
+      type: object
+      properties:
+        action:
+          type: string
+          const: "updateDialect"
+        dialect:
+          type: string
+        query:
+          type: string
+    DropDialect:
+      type: object
+      properties:
+        action:
+          type: string
+          const: "dropDialect"
+        dialect:
+          type: string
     DataField:
       type: object
       properties:
@@ -1668,12 +1789,17 @@ components:
         - $ref: '#/components/schemas/UpdateColumnType'
         - $ref: '#/components/schemas/UpdateColumnPosition'
         - $ref: '#/components/schemas/UpdateColumnNullability'
+      required:
+        - action
+      properties:
+        action:
+          type: string
     SetOption:
       type: object
       properties:
         action:
           type: string
-          enum: ["setOption"]
+          const: "setOption"
         key:
           type: string
         value:
@@ -1683,7 +1809,7 @@ components:
       properties:
         action:
           type: string
-          enum: ["removeOption"]
+          const: "removeOption"
         key:
           type: string
     UpdateComment:
@@ -1691,7 +1817,7 @@ components:
       properties:
         action:
           type: string
-          enum: ["updateComment"]
+          const: "updateComment"
         comment:
           type: string
     AddColumn:
@@ -1699,7 +1825,7 @@ components:
       properties:
         action:
           type: string
-          enum: ["addColumn"]
+          const: "addColumn"
         fieldNames:
           type: array
           items:
@@ -1715,7 +1841,7 @@ components:
       properties:
         action:
           type: string
-          enum: ["renameColumn"]
+          const: "renameColumn"
         fieldNames:
           type: array
           items:
@@ -1727,7 +1853,7 @@ components:
       properties:
         action:
           type: string
-          enum: ["dropColumn"]
+          const: "dropColumn"
         fieldNames:
           type: array
           items:
@@ -1737,7 +1863,7 @@ components:
       properties:
         action:
           type: string
-          enum: [ "updateColumnComment" ]
+          const: "updateColumnComment"
         fieldNames:
           type: array
           items:
@@ -1749,7 +1875,7 @@ components:
       properties:
         action:
           type: string
-          enum: [ "updateColumnType" ]
+          const: "updateColumnType"
         fieldNames:
           type: array
           items:
@@ -1763,7 +1889,7 @@ components:
       properties:
         action:
           type: string
-          enum: [ "updateColumnPosition" ]
+          const: "updateColumnPosition"
         move:
           $ref: '#/components/schemas/Move'
     UpdateColumnNullability:
@@ -1771,7 +1897,7 @@ components:
       properties:
         action:
           type: string
-          enum: [ "update_column_nullability" ]
+          const: "update_column_nullability"
         fieldNames:
           type: array
           items:
@@ -1812,12 +1938,17 @@ components:
       anyOf:
         - $ref: '#/components/schemas/SnapshotInstant'
         - $ref: '#/components/schemas/TagInstant'
+      required:
+        - type
+      properties:
+        type:
+          type: string
     SnapshotInstant:
       type: object
       properties:
         'type':
           type: string
-          enum: [ "snapshot" ]
+          const: "snapshot"
         snapshotId:
           type: integer
           format: int64
@@ -1826,7 +1957,7 @@ components:
       properties:
         'type':
           type: string
-          enum: [ "tag" ]
+          const: "tag"
         tagName:
           type: string
     Snapshot:
diff --git 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index 307a088ad4..1fd2ac2fbf 100644
--- 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++ 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -21,6 +21,7 @@ package org.apache.paimon.open.api;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
 import org.apache.paimon.rest.requests.CommitTableRequest;
 import org.apache.paimon.rest.requests.CreateBranchRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
@@ -896,4 +897,32 @@ public class RESTCatalogController {
             @PathVariable String prefix,
             @PathVariable String database,
             @PathVariable String view) {}
+
+    @Operation(
+            summary = "Alter view",
+            tags = {"view"})
+    @ApiResponses({
+        @ApiResponse(responseCode = "200", description = "Success, no 
content"),
+        @ApiResponse(
+                responseCode = "401",
+                description = "Unauthorized",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "409",
+                description = "Resource has exist",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))})
+    })
+    @PostMapping("/v1/{prefix}/databases/{database}/views/{view}")
+    public void alterView(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String view,
+            @RequestBody AlterViewRequest request) {}
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index e1cf70d3bd..53ae3758d5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
 import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
 import org.apache.paimon.spark.procedure.CompactManifestProcedure;
 import org.apache.paimon.spark.procedure.CompactProcedure;
@@ -100,6 +101,7 @@ public class SparkProcedures {
         procedureBuilders.put("compact_manifest", 
CompactManifestProcedure::builder);
         procedureBuilders.put("refresh_object_table", 
RefreshObjectTableProcedure::builder);
         procedureBuilders.put("clear_consumers", 
ClearConsumersProcedure::builder);
+        procedureBuilders.put("alter_view_dialect", 
AlterViewDialectProcedure::builder);
         return procedureBuilders.build();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
index 7b4e7dd57e..567b085691 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
@@ -37,6 +37,8 @@ import static 
org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
 /** Catalog methods for working with Views. */
 public interface SupportView extends WithPaimonCatalog {
 
+    String DIALECT = "spark";
+
     default List<String> listViews(String[] namespace) throws 
NoSuchNamespaceException {
         try {
             checkNamespace(namespace);
@@ -67,7 +69,7 @@ public interface SupportView extends WithPaimonCatalog {
                                     paimonIdent,
                                     toPaimonRowType(schema).getFields(),
                                     queryText,
-                                    Collections.singletonMap("spark", 
queryText),
+                                    Collections.singletonMap(DIALECT, 
queryText),
                                     comment,
                                     properties),
                             ignoreIfExists);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java
new file mode 100644
index 0000000000..029ecb29e6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.spark.catalog.SupportView;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.utils.CatalogUtils;
+import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.view.ViewChange;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * alter view dialect procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  -- add dialect in the view
+ *  CALL sys.alter_view_dialect('view_identifier', 'add', 'spark', 'query')
+ *  CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'add', `query` => 'query')
+ *
+ *  -- update dialect in the view
+ *  CALL sys.alter_view_dialect('view_identifier', 'update', 'spark', 'query')
+ *  CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'update', `query` => 'query')
+ *
+ *  -- drop dialect in the view
+ *  CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark')
+ *  CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'drop')
+ *
+ * </code></pre>
+ */
+public class AlterViewDialectProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("view", StringType),
+                ProcedureParameter.required("action", StringType),
+                ProcedureParameter.optional("engine", StringType),
+                ProcedureParameter.optional("query", StringType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected AlterViewDialectProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
+        org.apache.spark.sql.connector.catalog.Identifier ident =
+                toIdentifier(args.getString(0), PARAMETERS[0].name());
+        Identifier view = CatalogUtils.toIdentifier(ident);
+        ViewChange viewChange;
+        String dialect =
+                ((GenericInternalRow) args).genericGet(2) == null
+                                || 
StringUtils.isNullOrWhitespaceOnly(args.getString(2))
+                        ? SupportView.DIALECT
+                        : args.getString(2);
+        String query = ((GenericInternalRow) args).genericGet(3) == null ? 
null : args.getString(3);
+        switch (args.getString(1)) {
+            case "add":
+                {
+                    if (StringUtils.isNullOrWhitespaceOnly(query)) {
+                        throw new IllegalArgumentException("query is required 
for add action.");
+                    }
+                    viewChange = ViewChange.addDialect(dialect, query);
+                    break;
+                }
+            case "update":
+                {
+                    if (StringUtils.isNullOrWhitespaceOnly(query)) {
+                        throw new IllegalArgumentException("query is required 
for update action.");
+                    }
+                    viewChange = ViewChange.updateDialect(dialect, query);
+                    break;
+                }
+            case "drop":
+                {
+                    viewChange = ViewChange.dropDialect(dialect);
+                    break;
+                }
+            default:
+                {
+                    throw new IllegalArgumentException("Unsupported action: " 
+ args.getString(1));
+                }
+        }
+        try {
+            paimonCatalog.alterView(view, ImmutableList.of(viewChange), false);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return new InternalRow[] {newInternalRow(true)};
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<AlterViewDialectProcedure>() {
+            @Override
+            public AlterViewDialectProcedure doBuild() {
+                return new AlterViewDialectProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "AlterViewDialectProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
index 4f9ee6ec24..45de6a1c76 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
@@ -59,7 +59,8 @@ case class PaimonViewResolver(spark: SparkSession)
   }
 
   private def createViewRelation(nameParts: Seq[String], view: View): 
LogicalPlan = {
-    val parsedPlan = parseViewText(nameParts.toArray.mkString("."), 
view.query("spark"))
+    val parsedPlan =
+      parseViewText(nameParts.toArray.mkString("."), 
view.query(SupportView.DIALECT))
 
     val aliases = 
SparkTypeUtils.fromPaimonRowType(view.rowType()).fields.zipWithIndex.map {
       case (expected, pos) =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
index 941fa24bad..6e807d63ea 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
@@ -23,7 +23,6 @@ import 
org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
 import org.apache.paimon.view.View
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow}
 import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, 
quoteIfNeeded, StringUtils}
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -129,7 +128,7 @@ case class ShowCreatePaimonViewExec(output: Seq[Attribute], 
catalog: SupportView
     showDataColumns(view, builder)
     showComment(view, builder)
     showProperties(view, builder)
-    builder ++= s"AS\n${view.query("spark")}\n"
+    builder ++= s"AS\n${view.query(SupportView.DIALECT)}\n"
 
     Seq(new GenericInternalRow(values = 
Array(UTF8String.fromString(builder.toString))))
   }
@@ -203,7 +202,7 @@ case class DescribePaimonViewExec(
     rows += row("# Detailed View Information", "", "")
     rows += row("Name", view.fullName(), "")
     rows += row("Comment", view.comment().orElse(""), "")
-    rows += row("View Text", view.query("spark"), "")
+    rows += row("View Text", view.query(SupportView.DIALECT), "")
     rows += row(
       "View Query Output Columns",
       view.rowType().getFieldNames.asScala.mkString("[", ", ", "]"),
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
new file mode 100644
index 0000000000..124a089031
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.options.CatalogOptions
+import org.apache.paimon.rest.{RESTCatalogFactory, RESTCatalogInternalOptions, 
RESTCatalogServer}
+import org.apache.paimon.rest.auth.{AuthProviderEnum, BearTokenAuthProvider}
+import org.apache.paimon.rest.responses.ConfigResponse
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util.UUID
+
+class PaimonRestCatalogSparkTestBase extends PaimonSparkTestBase {
+
+  private var restCatalogServer: RESTCatalogServer = null
+  private var serverUrl: String = null
+  private var warehouse: String = null
+  private val initToken = "init_token"
+
+  override protected def beforeAll(): Unit = {
+    warehouse = UUID.randomUUID.toString
+    val config = new ConfigResponse(
+      ImmutableMap.of(
+        RESTCatalogInternalOptions.PREFIX.key,
+        "paimon",
+        CatalogOptions.WAREHOUSE.key,
+        warehouse),
+      ImmutableMap.of())
+    val authProvider = new BearTokenAuthProvider(initToken)
+    restCatalogServer =
+      new RESTCatalogServer(tempDBDir.getCanonicalPath, authProvider, config, 
warehouse)
+    restCatalogServer.start()
+    serverUrl = restCatalogServer.getUrl
+    super.beforeAll()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      super.afterAll()
+    } finally {
+      restCatalogServer.shutdown()
+    }
+  }
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.sql.catalog.paimon.metastore", RESTCatalogFactory.IDENTIFIER)
+      .set("spark.sql.catalog.paimon.uri", serverUrl)
+      .set("spark.sql.catalog.paimon.token", initToken)
+      .set("spark.sql.catalog.paimon.warehouse", warehouse)
+      .set("spark.sql.catalog.paimon.token.provider", 
AuthProviderEnum.BEAR.identifier)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
new file mode 100644
index 0000000000..1db9410a46
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonRestCatalogSparkTestBase
+import org.apache.paimon.spark.catalog.SupportView
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+class AlterViewDialectProcedureTest extends PaimonRestCatalogSparkTestBase {
+  test(s"test alter view dialect procedure") {
+    val viewName = "view_test"
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING)
+                 |""".stripMargin)
+
+    val query = "SELECT * FROM T WHERE `id` > 1";
+    spark.sql(s"""
+                 |CREATE VIEW $viewName as $query
+                 |""".stripMargin)
+    val checkViewQuery = (view: String, query: String) =>
+      Assertions
+        .assertThat(
+          spark
+            .sql(s"desc extended $view")
+            .filter("col_name = 'View Text'")
+            .select("data_type")
+            .collect()(0)(0))
+        .isEqualTo(query)
+
+    checkViewQuery(viewName, query)
+
+    checkAnswer(
+      spark.sql(s"CALL sys.alter_view_dialect('$viewName', 'drop', 
'${SupportView.DIALECT}')"),
+      Row(true))
+
+    checkAnswer(
+      spark.sql(
+        s"CALL sys.alter_view_dialect('$viewName', 'add', 
'${SupportView.DIALECT}', '$query')"),
+      Row(true))
+
+    checkViewQuery(viewName, query)
+
+    val newQuery = "SELECT * FROM T WHERE `id` > 2";
+
+    checkAnswer(
+      spark.sql(
+        s"CALL sys.alter_view_dialect('$viewName', 'update', 
'${SupportView.DIALECT}', '$newQuery')"),
+      Row(true))
+
+    checkViewQuery(viewName, newQuery)
+
+    checkAnswer(
+      spark.sql(s"CALL sys.alter_view_dialect(`view` => '$viewName', `action` 
=> 'drop')"),
+      Row(true))
+
+    checkAnswer(
+      spark.sql(
+        s"CALL sys.alter_view_dialect(`view` => '$viewName', `action` => 
'add', `query` => '$query')"),
+      Row(true))
+    checkViewQuery(viewName, query)
+
+    checkAnswer(
+      spark.sql(
+        s"CALL sys.alter_view_dialect(`view` => '$viewName', `action` => 
'update', `query` => '$newQuery')"),
+      Row(true))
+    checkViewQuery(viewName, newQuery)
+  }
+}

Reply via email to