This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 08a1356dd7 [core] support alter view and alter view's dialect
procedure (#5335)
08a1356dd7 is described below
commit 08a1356dd720bc1eb28d05ae969021d1f4d3805e
Author: jerry <[email protected]>
AuthorDate: Wed Mar 26 18:37:19 2025 +0800
[core] support alter view and alter view's dialect procedure (#5335)
---
docs/content/flink/procedures.md | 32 ++
docs/content/spark/procedures.md | 21 ++
.../java/org/apache/paimon/catalog/Catalog.java | 71 +++++
.../org/apache/paimon/catalog/DelegateCatalog.java | 7 +
.../rest/ExponentialHttpRetryInterceptor.java | 10 +-
.../java/org/apache/paimon/rest/RESTCatalog.java | 24 ++
.../paimon/rest/requests/AlterViewRequest.java | 49 +++
.../paimon/rest/responses/ErrorResponse.java | 2 +
.../java/org/apache/paimon/view/ViewChange.java | 354 +++++++++++++++++++++
.../org/apache/paimon/catalog/CatalogTestBase.java | 40 +--
.../rest/ExponentialHttpRetryInterceptorTest.java | 44 ++-
.../org/apache/paimon/rest/MockRESTMessage.java | 17 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 98 +++++-
.../org/apache/paimon/rest/RESTCatalogTest.java | 74 +++++
.../apache/paimon/rest/RESTObjectMapperTest.java | 12 +
.../java/org/apache/paimon/flink/FlinkCatalog.java | 6 +-
.../flink/procedure/AlterViewDialectProcedure.java | 109 +++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../org/apache/paimon/flink/RESTCatalogITCase.java | 86 +----
.../apache/paimon/flink/RESTCatalogITCaseBase.java | 106 ++++++
.../flink/procedure/AlterViewDialectITCase.java | 92 ++++++
paimon-open-api/rest-catalog-open-api.yaml | 157 ++++++++-
.../paimon/open/api/RESTCatalogController.java | 29 ++
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../apache/paimon/spark/catalog/SupportView.java | 4 +-
.../spark/procedure/AlterViewDialectProcedure.java | 152 +++++++++
.../catalyst/analysis/PaimonViewResolver.scala | 3 +-
.../paimon/spark/execution/PaimonViewExec.scala | 5 +-
.../spark/PaimonRestCatalogSparkTestBase.scala | 73 +++++
.../procedure/AlterViewDialectProcedureTest.scala | 86 +++++
30 files changed, 1611 insertions(+), 155 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 93ec13aec0..e191108068 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -799,5 +799,37 @@ All available procedures are listed below.
CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16,
`partition` => 'dt=20250217,hh=08')
</td>
</tr>
+ <tr>
+ <td>alter_view_dialect</td>
+ <td>
+ -- add dialect in the view<br/>
+ CALL [catalog.]sys.alter_view_dialect('view_identifier', 'add',
'flink', 'query')<br/>
+ CALL [catalog.]sys.alter_view_dialect(`view` => 'view_identifier',
`action` => 'add', `query` => 'query')<br/><br/>
+ -- update dialect in the view<br/>
+ CALL [catalog.]sys.alter_view_dialect('view_identifier', 'update',
'flink', 'query')<br/>
+ CALL [catalog.]sys.alter_view_dialect(`view` => 'view_identifier',
`action` => 'update', `query` => 'query')<br/><br/>
+ -- drop dialect in the view<br/>
+ CALL [catalog.]sys.alter_view_dialect('view_identifier', 'drop',
'flink')<br/><br/>
+ CALL [catalog.]sys.alter_view_dialect(`view` => 'view_identifier',
`action` => 'drop')<br/><br/>
+ </td>
+ <td>
+ To alter view dialect. Arguments:
+ <li>view: the target view identifier. Cannot be empty.</li>
+ <li>action: define change action like: add, update, drop. Cannot
be empty.</li>
+ <li>engine: when engine which is not flink need define it.</li>
+ <li>query: query for the dialect when action is add and update it
couldn't be empty.</li>
+ </td>
+ <td>
+ -- add dialect in the view<br/>
+ CALL sys.alter_view_dialect('view_identifier', 'add', 'flink',
'query')<br/>
+ CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'add', `query` => 'query')<br/><br/>
+ -- update dialect in the view<br/>
+ CALL sys.alter_view_dialect('view_identifier', 'update', 'flink',
'query')<br/>
+ CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'update', `query` => 'query')<br/><br/>
+ -- drop dialect in the view<br/>
+ CALL sys.alter_view_dialect('view_identifier', 'drop', 'flink')<br/>
+ CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'drop')<br/><br/>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index e534bcb480..fb46b6266c 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -380,5 +380,26 @@ This section introduce all available spark procedures
about paimon.
CALL sys.compact_manifest(`table` => 'default.T')
</td>
</tr>
+ <tr>
+ <td>alter_view_dialect</td>
+ <td>
+ To alter view dialect. Arguments:
+ <li>view: the target view identifier. Cannot be empty.</li>
+ <li>action: define change action like: add, update, drop. Cannot
be empty.</li>
+ <li>engine: when engine which is not spark need define it.</li>
+ <li>query: query for the dialect when action is add and update it
couldn't be empty.</li>
+ </td>
+ <td>
+ -- add dialect in the view<br/>
+ CALL sys.alter_view_dialect('view_identifier', 'add', 'spark',
'query')<br/>
+ CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'add', `query` => 'query')<br/><br/>
+ -- update dialect in the view<br/>
+ CALL sys.alter_view_dialect('view_identifier', 'update', 'spark',
'query')<br/>
+ CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'update', `query` => 'query')<br/><br/>
+ -- drop dialect in the view<br/>
+ CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark')<br/>
+ CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'drop')<br/><br/>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 3698f7a670..ed87935736 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -29,6 +29,7 @@ import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
import javax.annotation.Nullable;
@@ -437,6 +438,21 @@ public interface Catalog extends AutoCloseable {
throw new UnsupportedOperationException();
}
+ /**
+ * Alter a view.
+ *
+ * @param view identifier of the view to alter
+ * @param viewChanges - changes of view
+ * @param ignoreIfNotExists
+ * @throws ViewNotExistException if the view does not exist
+ * @throws DialectAlreadyExistException if the dialect already exists
+ * @throws DialectNotExistException if the dialect not exists
+ */
+ default void alterView(Identifier view, List<ViewChange> viewChanges,
boolean ignoreIfNotExists)
+ throws ViewNotExistException, DialectAlreadyExistException,
DialectNotExistException {
+ throw new UnsupportedOperationException();
+ }
+
// ======================= repair methods ===============================
/**
@@ -884,6 +900,34 @@ public interface Catalog extends AutoCloseable {
}
}
+ /** Exception for trying to add a dialect that already exists. */
+ class DialectAlreadyExistException extends Exception {
+
+ private static final String MSG = "Dialect %s in view %s already
exists.";
+
+ private final Identifier identifier;
+ private final String dialect;
+
+ public DialectAlreadyExistException(Identifier identifier, String
dialect) {
+ this(identifier, dialect, null);
+ }
+
+ public DialectAlreadyExistException(
+ Identifier identifier, String dialect, Throwable cause) {
+ super(String.format(MSG, dialect, identifier.getFullName()),
cause);
+ this.identifier = identifier;
+ this.dialect = dialect;
+ }
+
+ public Identifier identifier() {
+ return identifier;
+ }
+
+ public String dialect() {
+ return dialect;
+ }
+ }
+
/** Exception for trying to create a branch that already exists. */
class BranchAlreadyExistException extends Exception {
@@ -964,4 +1008,31 @@ public interface Catalog extends AutoCloseable {
return tag;
}
}
+
+ /** Exception for trying to update dialect that doesn't exist. */
+ class DialectNotExistException extends Exception {
+
+ private static final String MSG = "Dialect %s in view %s doesn't
exist.";
+
+ private final Identifier identifier;
+ private final String dialect;
+
+ public DialectNotExistException(Identifier identifier, String dialect)
{
+ this(identifier, dialect, null);
+ }
+
+ public DialectNotExistException(Identifier identifier, String dialect,
Throwable cause) {
+ super(String.format(MSG, dialect, identifier.getFullName()),
cause);
+ this.identifier = identifier;
+ this.dialect = dialect;
+ }
+
+ public Identifier identifier() {
+ return identifier;
+ }
+
+ public String dialect() {
+ return dialect;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 756881af17..47064f8e8f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
import javax.annotation.Nullable;
@@ -251,6 +252,12 @@ public abstract class DelegateCatalog implements Catalog {
wrapped.renameView(fromView, toView, ignoreIfNotExists);
}
+ @Override
+ public void alterView(Identifier view, List<ViewChange> viewChanges,
boolean ignoreIfNotExists)
+ throws ViewNotExistException, DialectAlreadyExistException,
DialectNotExistException {
+ wrapped.alterView(view, viewChanges, ignoreIfNotExists);
+ }
+
@Override
public List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException {
return wrapped.listPartitions(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
index dd16e47fc5..524080e42b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
@@ -122,19 +122,19 @@ public class ExponentialHttpRetryInterceptor implements
Interceptor {
|| (!response.isSuccessful() &&
retrievableCodes.contains(response.code()));
}
- public boolean needRetry(String method, IOException e, int execCount) {
+ public boolean needRetry(String method, IOException e, int execCount)
throws IOException {
if (execCount > maxRetries) {
- return false;
+ throw e;
}
if (!retrievableMethods.contains(method)) {
- return false;
+ throw e;
}
if (nonRetriableExceptions.contains(e.getClass())) {
- return false;
+ throw e;
} else {
for (Class<? extends IOException> rejectException :
nonRetriableExceptions) {
if (rejectException.isInstance(e)) {
- return false;
+ throw e;
}
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index be61cfd894..0b0020a461 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -44,6 +44,7 @@ import
org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
import org.apache.paimon.rest.requests.CommitTableRequest;
import org.apache.paimon.rest.requests.CreateBranchRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
@@ -80,6 +81,7 @@ import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
import org.apache.paimon.view.ViewImpl;
import org.apache.paimon.view.ViewSchema;
@@ -899,6 +901,28 @@ public class RESTCatalog implements Catalog {
}
}
+ @Override
+ public void alterView(
+ Identifier identifier, List<ViewChange> viewChanges, boolean
ignoreIfNotExists)
+ throws ViewNotExistException, DialectAlreadyExistException,
DialectNotExistException {
+ try {
+ AlterViewRequest request = new AlterViewRequest(viewChanges);
+ client.post(
+ resourcePaths.view(identifier.getDatabaseName(),
identifier.getObjectName()),
+ request,
+ restAuthFunction);
+ } catch (AlreadyExistsException e) {
+ throw new DialectAlreadyExistException(identifier,
e.resourceName());
+ } catch (NoSuchResourceException e) {
+ if (StringUtils.equals(e.resourceType(),
ErrorResponse.RESOURCE_TYPE_DIALECT)) {
+ throw new DialectNotExistException(identifier,
e.resourceName());
+ }
+ if (!ignoreIfNotExists) {
+ throw new ViewNotExistException(identifier);
+ }
+ }
+ }
+
@Override
public boolean caseSensitive() {
return context.options().getOptional(CASE_SENSITIVE).orElse(true);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/AlterViewRequest.java
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/AlterViewRequest.java
new file mode 100644
index 0000000000..f31e903840
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/AlterViewRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+import org.apache.paimon.view.ViewChange;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Request for altering view. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AlterViewRequest implements RESTRequest {
+
+ private static final String FIELD_CHANGES = "changes";
+
+ @JsonProperty(FIELD_CHANGES)
+ private final List<ViewChange> viewChanges;
+
+ @JsonCreator
+ public AlterViewRequest(@JsonProperty(FIELD_CHANGES) List<ViewChange>
viewChanges) {
+ this.viewChanges = viewChanges;
+ }
+
+ @JsonGetter(FIELD_CHANGES)
+ public List<ViewChange> viewChanges() {
+ return viewChanges;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
index 04a9119d4f..b66459a5f0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
@@ -43,6 +43,8 @@ public class ErrorResponse implements RESTResponse {
public static final String RESOURCE_TYPE_VIEW = "VIEW";
+ public static final String RESOURCE_TYPE_DIALECT = "DIALECT";
+
private static final String FIELD_MESSAGE = "message";
private static final String FIELD_RESOURCE_TYPE = "resourceType";
private static final String FIELD_RESOURCE_NAME = "resourceName";
diff --git a/paimon-core/src/main/java/org/apache/paimon/view/ViewChange.java
b/paimon-core/src/main/java/org/apache/paimon/view/ViewChange.java
new file mode 100644
index 0000000000..da15a6032e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/view/ViewChange.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.view;
+
+import org.apache.paimon.annotation.Public;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Dialect change to view. */
+@Public
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = ViewChange.Actions.FIELD_TYPE)
+@JsonSubTypes({
+ @JsonSubTypes.Type(
+ value = ViewChange.SetViewOption.class,
+ name = ViewChange.Actions.SET_OPTION_ACTION),
+ @JsonSubTypes.Type(
+ value = ViewChange.RemoveViewOption.class,
+ name = ViewChange.Actions.REMOVE_OPTION_ACTION),
+ @JsonSubTypes.Type(
+ value = ViewChange.UpdateViewComment.class,
+ name = ViewChange.Actions.UPDATE_COMMENT_ACTION),
+ @JsonSubTypes.Type(
+ value = ViewChange.AddDialect.class,
+ name = ViewChange.Actions.ADD_DIALECT_ACTION),
+ @JsonSubTypes.Type(
+ value = ViewChange.UpdateDialect.class,
+ name = ViewChange.Actions.UPDATE_DIALECT_ACTION),
+ @JsonSubTypes.Type(
+ value = ViewChange.DropDialect.class,
+ name = ViewChange.Actions.DROP_DIALECT_ACTION)
+})
+public interface ViewChange extends Serializable {
+
+ static ViewChange setOption(String key, String value) {
+ return new ViewChange.SetViewOption(key, value);
+ }
+
+ static ViewChange removeOption(String key) {
+ return new ViewChange.RemoveViewOption(key);
+ }
+
+ static ViewChange updateComment(String comment) {
+ return new ViewChange.UpdateViewComment(comment);
+ }
+
+ static ViewChange addDialect(String dialect, String query) {
+ return new AddDialect(dialect, query);
+ }
+
+ static ViewChange updateDialect(String dialect, String query) {
+ return new UpdateDialect(dialect, query);
+ }
+
+ static ViewChange dropDialect(String dialect) {
+ return new DropDialect(dialect);
+ }
+
+ /** set a view option for view change. */
+ final class SetViewOption implements ViewChange {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String FIELD_KEY = "key";
+ private static final String FIELD_VALUE = "value";
+
+ @JsonProperty(FIELD_KEY)
+ private final String key;
+
+ @JsonProperty(FIELD_VALUE)
+ private final String value;
+
+ @JsonCreator
+ private SetViewOption(
+ @JsonProperty(FIELD_KEY) String key,
@JsonProperty(FIELD_VALUE) String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @JsonGetter(FIELD_KEY)
+ public String key() {
+ return key;
+ }
+
+ @JsonGetter(FIELD_VALUE)
+ public String value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SetViewOption that = (SetViewOption) o;
+ return key.equals(that.key) && value.equals(that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+ }
+
+ /** remove a view option for view change. */
+ final class RemoveViewOption implements ViewChange {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String FIELD_KEY = "key";
+
+ @JsonProperty(FIELD_KEY)
+ private final String key;
+
+ private RemoveViewOption(@JsonProperty(FIELD_KEY) String key) {
+ this.key = key;
+ }
+
+ @JsonGetter(FIELD_KEY)
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RemoveViewOption that = (RemoveViewOption) o;
+ return key.equals(that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key);
+ }
+ }
+
+ /** update a view comment for view change. */
+ final class UpdateViewComment implements ViewChange {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String FIELD_COMMENT = "comment";
+
+ // If comment is null, means to remove comment
+ @JsonProperty(FIELD_COMMENT)
+ private final @Nullable String comment;
+
+ private UpdateViewComment(@JsonProperty(FIELD_COMMENT) @Nullable
String comment) {
+ this.comment = comment;
+ }
+
+ @JsonGetter(FIELD_COMMENT)
+ public @Nullable String comment() {
+ return comment;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ UpdateViewComment that = (UpdateViewComment) object;
+ return Objects.equals(comment, that.comment);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(comment);
+ }
+ }
+
+ /** addDialect dialect for view change. */
+ final class AddDialect implements ViewChange {
+ private static final long serialVersionUID = 1L;
+ private static final String FIELD_DIALECT = "dialect";
+ private static final String FIELD_QUERY = "query";
+
+ @JsonProperty(FIELD_DIALECT)
+ private final String dialect;
+
+ @JsonProperty(FIELD_QUERY)
+ private final String query;
+
+ @JsonCreator
+ public AddDialect(
+ @JsonProperty(FIELD_DIALECT) String dialect,
+ @JsonProperty(FIELD_QUERY) String query) {
+ this.dialect = dialect;
+ this.query = query;
+ }
+
+ @JsonGetter(FIELD_DIALECT)
+ public String dialect() {
+ return dialect;
+ }
+
+ @JsonGetter(FIELD_QUERY)
+ public String query() {
+ return query;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ AddDialect that = (AddDialect) object;
+ return Objects.equals(dialect, that.dialect) &&
Objects.equals(query, that.query);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dialect, query);
+ }
+ }
+
+ /** update dialect for view change. */
+ final class UpdateDialect implements ViewChange {
+ private static final long serialVersionUID = 1L;
+ private static final String FIELD_DIALECT = "dialect";
+ private static final String FIELD_QUERY = "query";
+
+ @JsonProperty(FIELD_DIALECT)
+ private final String dialect;
+
+ @JsonProperty(FIELD_QUERY)
+ private final String query;
+
+ @JsonCreator
+ public UpdateDialect(
+ @JsonProperty(FIELD_DIALECT) String dialect,
+ @JsonProperty(FIELD_QUERY) String query) {
+ this.dialect = dialect;
+ this.query = query;
+ }
+
+ @JsonGetter(FIELD_DIALECT)
+ public String dialect() {
+ return dialect;
+ }
+
+ @JsonGetter(FIELD_QUERY)
+ public String query() {
+ return query;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ UpdateDialect that = (UpdateDialect) object;
+ return Objects.equals(dialect, that.dialect) &&
Objects.equals(query, that.query);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dialect, query);
+ }
+ }
+
+ /** drop dialect for view change. */
+ final class DropDialect implements ViewChange {
+ private static final long serialVersionUID = 1L;
+ private static final String FIELD_DIALECT = "dialect";
+
+ @JsonProperty(FIELD_DIALECT)
+ private final String dialect;
+
+ @JsonCreator
+ public DropDialect(@JsonProperty(FIELD_DIALECT) String dialect) {
+ this.dialect = dialect;
+ }
+
+ @JsonGetter(FIELD_DIALECT)
+ public String dialect() {
+ return dialect;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ DropDialect that = (DropDialect) object;
+ return Objects.equals(dialect, that.dialect);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dialect);
+ }
+ }
+
+ /** Actions for view alter. */
+ class Actions {
+ public static final String FIELD_TYPE = "action";
+ public static final String ADD_DIALECT_ACTION = "addDialect";
+ public static final String UPDATE_DIALECT_ACTION = "updateDialect";
+ public static final String DROP_DIALECT_ACTION = "dropDialect";
+ public static final String SET_OPTION_ACTION = "setOption";
+ public static final String REMOVE_OPTION_ACTION = "removeOption";
+ public static final String UPDATE_COMMENT_ACTION = "updateComment";
+
+ private Actions() {}
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 49d6f623c1..2c7f8812ac 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -1048,26 +1048,8 @@ public abstract class CatalogTestBase {
if (!supportsView()) {
return;
}
-
Identifier identifier = new Identifier("view_db", "my_view");
- RowType rowType =
- RowType.builder()
- .field("str", DataTypes.STRING())
- .field("int", DataTypes.INT())
- .build();
- String query = "SELECT * FROM OTHER_TABLE";
- String comment = "it is my view";
- Map<String, String> options = new HashMap<>();
- options.put("key1", "v1");
- options.put("key2", "v2");
-
- Map<String, String> dialects = new HashMap<>();
- if (supportsViewDialects()) {
- dialects.put("flink", "SELECT * FROM FLINK_TABLE");
- dialects.put("spark", "SELECT * FROM SPARK_TABLE");
- }
- View view =
- new ViewImpl(identifier, rowType.getFields(), query, dialects,
comment, options);
+ View view = createView(identifier);
assertThatThrownBy(() -> catalog.createView(identifier, view, false))
.isInstanceOf(Catalog.DatabaseNotExistException.class);
@@ -1568,4 +1550,24 @@ public abstract class CatalogTestBase {
options.put("type", "format-table");
return options;
}
+
+ protected View createView(Identifier identifier) {
+ RowType rowType =
+ RowType.builder()
+ .field("str", DataTypes.STRING())
+ .field("int", DataTypes.INT())
+ .build();
+ String query = "SELECT * FROM OTHER_TABLE";
+ String comment = "it is my view";
+ Map<String, String> options = new HashMap<>();
+ options.put("key1", "v1");
+ options.put("key2", "v2");
+
+ Map<String, String> dialects = new HashMap<>();
+ if (supportsViewDialects()) {
+ dialects.put("flink", "SELECT * FROM FLINK_TABLE");
+ dialects.put("spark", "SELECT * FROM SPARK_TABLE");
+ }
+ return new ViewImpl(identifier, rowType.getFields(), query, dialects,
comment, options);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
index 6510371f2d..02e285b7e7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
@@ -33,6 +33,7 @@ import java.net.NoRouteToHostException;
import java.net.UnknownHostException;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link ExponentialHttpRetryInterceptor}. */
class ExponentialHttpRetryInterceptorTest {
@@ -42,7 +43,7 @@ class ExponentialHttpRetryInterceptorTest {
new ExponentialHttpRetryInterceptor(maxRetries);
@Test
- void testNeedRetryByMethod() {
+ void testNeedRetryByMethod() throws IOException {
assertThat(interceptor.needRetry("GET", new IOException(),
1)).isTrue();
assertThat(interceptor.needRetry("HEAD", new IOException(),
1)).isTrue();
@@ -50,25 +51,38 @@ class ExponentialHttpRetryInterceptorTest {
assertThat(interceptor.needRetry("DELETE", new IOException(),
1)).isTrue();
assertThat(interceptor.needRetry("TRACE", new IOException(),
1)).isTrue();
assertThat(interceptor.needRetry("OPTIONS", new IOException(),
1)).isTrue();
-
- assertThat(interceptor.needRetry("POST", new IOException(),
1)).isFalse();
- assertThat(interceptor.needRetry("PATCH", new IOException(),
1)).isFalse();
- assertThat(interceptor.needRetry("CONNECT", new IOException(),
1)).isFalse();
- assertThat(interceptor.needRetry("GET", new IOException(), maxRetries
+ 1)).isFalse();
+ assertThatThrownBy(() -> interceptor.needRetry("POST", new
IOException(), 1))
+ .isInstanceOf(IOException.class);
+ assertThatThrownBy(() -> interceptor.needRetry("POST", new
IOException(), 1))
+ .isInstanceOf(IOException.class);
+ assertThatThrownBy(() -> interceptor.needRetry("PATCH", new
IOException(), 1))
+ .isInstanceOf(IOException.class);
+ assertThatThrownBy(() -> interceptor.needRetry("CONNECT", new
IOException(), 1))
+ .isInstanceOf(IOException.class);
+ assertThatThrownBy(() -> interceptor.needRetry("GET", new
IOException(), maxRetries + 1))
+ .isInstanceOf(IOException.class);
}
@Test
- void testNeedRetryByException() {
-
- assertThat(interceptor.needRetry("GET", new InterruptedIOException(),
1)).isFalse();
- assertThat(interceptor.needRetry("GET", new UnknownHostException(),
1)).isFalse();
- assertThat(interceptor.needRetry("GET", new ConnectException(),
1)).isFalse();
- assertThat(interceptor.needRetry("GET", new NoRouteToHostException(),
1)).isFalse();
- assertThat(interceptor.needRetry("GET", new SSLException("error"),
1)).isFalse();
+ void testNeedRetryByException() throws IOException {
+
+ assertThatThrownBy(() -> interceptor.needRetry("GET", new
InterruptedIOException(), 1))
+ .isInstanceOf(InterruptedIOException.class);
+ assertThatThrownBy(() -> interceptor.needRetry("GET", new
UnknownHostException(), 1))
+ .isInstanceOf(UnknownHostException.class);
+ assertThatThrownBy(() -> interceptor.needRetry("GET", new
ConnectException(), 1))
+ .isInstanceOf(ConnectException.class);
+ assertThatThrownBy(() -> interceptor.needRetry("GET", new
NoRouteToHostException(), 1))
+ .isInstanceOf(NoRouteToHostException.class);
+ assertThatThrownBy(() -> interceptor.needRetry("GET", new
SSLException("error"), 1))
+ .isInstanceOf(SSLException.class);
assertThat(interceptor.needRetry("GET", new IOException("error"),
1)).isTrue();
- assertThat(interceptor.needRetry("GET", new IOException("error"),
maxRetries + 1))
- .isFalse();
+ assertThatThrownBy(
+ () ->
+ interceptor.needRetry(
+ "GET", new IOException("error"),
maxRetries + 1))
+ .isInstanceOf(IOException.class);
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index 0d5d46c550..b5155c58f1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.requests.CreateViewRequest;
@@ -44,6 +45,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.view.ViewChange;
import org.apache.paimon.view.ViewSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -271,6 +273,17 @@ public class MockRESTMessage {
return new RollbackTableRequest(Instant.tag(tagName));
}
+ public static AlterViewRequest alterViewRequest() {
+ List<ViewChange> viewChanges = new ArrayList<>();
+ viewChanges.add(ViewChange.setOption("key", "value"));
+ viewChanges.add(ViewChange.removeOption("key"));
+ viewChanges.add(ViewChange.updateComment("comment"));
+ viewChanges.add(ViewChange.addDialect("dialect", "query"));
+ viewChanges.add(ViewChange.updateDialect("dialect", "query"));
+ viewChanges.add(ViewChange.dropDialect("dialect"));
+ return new AlterViewRequest(viewChanges);
+ }
+
private static ViewSchema viewSchema() {
List<DataField> fields =
Arrays.asList(
@@ -284,10 +297,6 @@ public class MockRESTMessage {
Collections.singletonMap("pt", "1"));
}
- private static Partition partition() {
- return new Partition(Collections.singletonMap("pt", "1"), 1, 1, 1, 1,
false);
- }
-
private static Schema schema(Map<String, String> options) {
List<DataField> fields =
Arrays.asList(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index fd1497d03d..129448f861 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -42,6 +42,7 @@ import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
import org.apache.paimon.rest.requests.CommitTableRequest;
import org.apache.paimon.rest.requests.CreateBranchRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
@@ -77,6 +78,7 @@ import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
import org.apache.paimon.view.ViewImpl;
import org.apache.paimon.view.ViewSchema;
@@ -352,10 +354,7 @@ public class RESTCatalogServer {
return new MockResponse().setResponseCode(200);
} else if (isPartitions) {
return partitionsApiHandle(
- restAuthParameter.method(),
- restAuthParameter.data(),
- parameters,
- identifier);
+ restAuthParameter.method(), parameters,
identifier);
} else if (isBranches) {
return branchApiHandle(
resources,
@@ -411,7 +410,10 @@ public class RESTCatalogServer {
return viewDetailsHandle(
restAuthParameter.method(), databaseName,
parameters);
} else if (isView) {
- return viewHandle(restAuthParameter.method(),
identifier);
+ return viewHandle(
+ restAuthParameter.method(),
+ identifier,
+ restAuthParameter.data());
} else {
return databaseHandle(
restAuthParameter.method(),
@@ -492,6 +494,14 @@ public class RESTCatalogServer {
e.getMessage(),
404);
return mockResponse(response, 404);
+ } catch (Catalog.DialectNotExistException e) {
+ response =
+ new ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_DIALECT,
+ e.dialect(),
+ e.getMessage(),
+ 404);
+ return mockResponse(response, 404);
} catch (Catalog.ViewAlreadyExistException e) {
response =
new ErrorResponse(
@@ -500,6 +510,14 @@ public class RESTCatalogServer {
e.getMessage(),
409);
return mockResponse(response, 409);
+ } catch (Catalog.DialectAlreadyExistException e) {
+ response =
+ new ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_DIALECT,
+ e.dialect(),
+ e.getMessage(),
+ 409);
+ return mockResponse(response, 409);
} catch (IllegalArgumentException e) {
response = new ErrorResponse(null, null, e.getMessage(),
400);
return mockResponse(response, 400);
@@ -1026,7 +1044,7 @@ public class RESTCatalogServer {
}
private MockResponse partitionsApiHandle(
- String method, String data, Map<String, String> parameters,
Identifier tableIdentifier)
+ String method, Map<String, String> parameters, Identifier
tableIdentifier)
throws Exception {
switch (method) {
case "GET":
@@ -1301,7 +1319,8 @@ public class RESTCatalogServer {
.collect(Collectors.toList());
}
- private MockResponse viewHandle(String method, Identifier identifier)
throws Exception {
+ private MockResponse viewHandle(String method, Identifier identifier,
String requestData)
+ throws Exception {
RESTResponse response;
if (viewStore.containsKey(identifier.getFullName())) {
switch (method) {
@@ -1331,6 +1350,71 @@ public class RESTCatalogServer {
case "DELETE":
viewStore.remove(identifier.getFullName());
return new MockResponse().setResponseCode(200);
+ case "POST":
+ if (viewStore.containsKey(identifier.getFullName())) {
+ AlterViewRequest request =
+ OBJECT_MAPPER.readValue(requestData,
AlterViewRequest.class);
+ ViewImpl view = (ViewImpl)
viewStore.get(identifier.getFullName());
+ HashMap<String, String> newDialects = new
HashMap<>(view.dialects());
+ Map<String, String> newOptions = new
HashMap<>(view.options());
+ String newComment = view.comment().orElse(null);
+ for (ViewChange viewChange : request.viewChanges()) {
+ if (viewChange instanceof
ViewChange.SetViewOption) {
+ ViewChange.SetViewOption setViewOption =
+ (ViewChange.SetViewOption) viewChange;
+ newOptions.put(setViewOption.key(),
setViewOption.value());
+
+ } else if (viewChange instanceof
ViewChange.RemoveViewOption) {
+ ViewChange.RemoveViewOption removeViewOption =
+ (ViewChange.RemoveViewOption)
viewChange;
+ newOptions.remove(removeViewOption.key());
+ } else if (viewChange instanceof
ViewChange.UpdateViewComment) {
+ ViewChange.UpdateViewComment updateViewComment
=
+ (ViewChange.UpdateViewComment)
viewChange;
+ newComment = updateViewComment.comment();
+ } else if (viewChange instanceof
ViewChange.AddDialect) {
+ ViewChange.AddDialect addDialect =
+ (ViewChange.AddDialect) viewChange;
+ if
(view.dialects().containsKey(addDialect.dialect())) {
+
+ throw new
Catalog.DialectAlreadyExistException(
+ identifier, addDialect.dialect());
+ } else {
+ newDialects.put(addDialect.dialect(),
addDialect.query());
+ }
+ } else if (viewChange instanceof
ViewChange.UpdateDialect) {
+ ViewChange.UpdateDialect updateDialect =
+ (ViewChange.UpdateDialect) viewChange;
+ if
(view.dialects().containsKey(updateDialect.dialect())) {
+ newDialects.put(updateDialect.dialect(),
updateDialect.query());
+ } else {
+ throw new Catalog.DialectNotExistException(
+ identifier,
updateDialect.dialect());
+ }
+ } else if (viewChange instanceof
ViewChange.DropDialect) {
+ ViewChange.DropDialect dropDialect =
+ (ViewChange.DropDialect) viewChange;
+ if
(view.dialects().containsKey(dropDialect.dialect())) {
+ newDialects.remove(dropDialect.dialect());
+ } else {
+ throw new Catalog.DialectNotExistException(
+ identifier, dropDialect.dialect());
+ }
+ }
+ }
+ view =
+ new ViewImpl(
+ identifier,
+ view.rowType().getFields(),
+ view.query(),
+ newDialects,
+ newComment,
+ newOptions);
+ viewStore.put(identifier.getFullName(), view);
+ return new MockResponse().setResponseCode(200);
+ } else {
+ throw new Catalog.ViewNotExistException(identifier);
+ }
default:
return new MockResponse().setResponseCode(404);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index f7d734e05c..920400c53a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -50,6 +50,7 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewChange;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -82,6 +83,7 @@ import static
org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
import static
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -1053,6 +1055,78 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
assertThat(fetchData).containsSequence(testData);
}
+ @Test
+ void testAlterView() throws Exception {
+ Identifier identifier = new Identifier("rest_catalog_db", "my_view");
+ View view = createView(identifier);
+ catalog.createDatabase(identifier.getDatabaseName(), false);
+ ViewChange.AddDialect addDialect =
+ (ViewChange.AddDialect)
+ ViewChange.addDialect("flink_1", "SELECT * FROM
FLINK_TABLE_1");
+ assertDoesNotThrow(() -> catalog.alterView(identifier,
ImmutableList.of(addDialect), true));
+ assertThrows(
+ Catalog.ViewNotExistException.class,
+ () -> catalog.alterView(identifier,
ImmutableList.of(addDialect), false));
+ catalog.createView(identifier, view, false);
+ // set options
+ String key = UUID.randomUUID().toString();
+ String value = UUID.randomUUID().toString();
+ ViewChange setOption = ViewChange.setOption(key, value);
+ catalog.alterView(identifier, ImmutableList.of(setOption), false);
+ View catalogView = catalog.getView(identifier);
+ assertThat(catalogView.options().get(key)).isEqualTo(value);
+
+ // remove options
+ catalog.alterView(identifier,
ImmutableList.of(ViewChange.removeOption(key)), false);
+ catalogView = catalog.getView(identifier);
+ assertThat(catalogView.options().containsKey(key)).isEqualTo(false);
+
+ // update comment
+ String newComment = "new comment";
+ catalog.alterView(
+ identifier,
ImmutableList.of(ViewChange.updateComment(newComment)), false);
+ catalogView = catalog.getView(identifier);
+ assertThat(catalogView.comment().get()).isEqualTo(newComment);
+ // add dialect
+ catalog.alterView(identifier, ImmutableList.of(addDialect), false);
+ catalogView = catalog.getView(identifier);
+
assertThat(catalogView.query(addDialect.dialect())).isEqualTo(addDialect.query());
+ assertThrows(
+ Catalog.DialectAlreadyExistException.class,
+ () -> catalog.alterView(identifier,
ImmutableList.of(addDialect), false));
+
+ // update dialect
+ ViewChange.UpdateDialect updateDialect =
+ (ViewChange.UpdateDialect)
+ ViewChange.updateDialect("flink_1", "SELECT * FROM
FLINK_TABLE_2");
+ catalog.alterView(identifier, ImmutableList.of(updateDialect), false);
+ catalogView = catalog.getView(identifier);
+
assertThat(catalogView.query(updateDialect.dialect())).isEqualTo(updateDialect.query());
+ assertThrows(
+ Catalog.DialectNotExistException.class,
+ () ->
+ catalog.alterView(
+ identifier,
+ ImmutableList.of(
+ ViewChange.updateDialect(
+ "no_exist", "SELECT * FROM
FLINK_TABLE_2")),
+ false));
+
+ // drop dialect
+ ViewChange.DropDialect dropDialect =
+ (ViewChange.DropDialect)
ViewChange.dropDialect(updateDialect.dialect());
+ catalog.alterView(identifier, ImmutableList.of(dropDialect), false);
+ catalogView = catalog.getView(identifier);
+
assertThat(catalogView.query(dropDialect.dialect())).isEqualTo(catalogView.query());
+ assertThrows(
+ Catalog.DialectNotExistException.class,
+ () ->
+ catalog.alterView(
+ identifier,
+
ImmutableList.of(ViewChange.dropDialect("no_exist")),
+ false));
+ }
+
private TestPagedResponse generateTestPagedResponse(
Map<String, String> queryParams,
List<Integer> testData,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
index 974837c69a..c395d2e303 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.requests.CreateViewRequest;
@@ -268,4 +269,15 @@ public class RESTObjectMapperTest {
.getInstant();
assertEquals(rollbackTableRequestByTagParseData.getTagName(), tagName);
}
+
+ @Test
+ public void alterViewRequestParseTest() throws Exception {
+ AlterViewRequest request = MockRESTMessage.alterViewRequest();
+ String requestStr = OBJECT_MAPPER.writeValueAsString(request);
+ AlterViewRequest parseData = OBJECT_MAPPER.readValue(requestStr,
AlterViewRequest.class);
+ assertEquals(parseData.viewChanges().size(),
request.viewChanges().size());
+ for (int i = 0; i < request.viewChanges().size(); i++) {
+ assertEquals(parseData.viewChanges().get(i),
request.viewChanges().get(i));
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index f45b6c9240..ed4f3b8242 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -167,6 +167,8 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
/** Catalog for paimon. */
public class FlinkCatalog extends AbstractCatalog {
+ public static final String DIALECT = "flink";
+
private static final Logger LOG =
LoggerFactory.getLogger(FlinkCatalog.class);
private final ClassLoader classLoader;
@@ -346,7 +348,7 @@ public class FlinkCatalog extends AbstractCatalog {
org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(fromLogicalToDataType(toLogicalType(view.rowType())))
.build();
- String query = view.query("flink");
+ String query = view.query(DIALECT);
return Optional.of(
CatalogView.of(schema, view.comment().orElse(null), query,
query, view.options()));
}
@@ -456,7 +458,7 @@ public class FlinkCatalog extends AbstractCatalog {
identifier,
builder.build().getFields(),
query,
- Collections.singletonMap("flink", query),
+ Collections.singletonMap(DIALECT, query),
table.getComment(),
table.getOptions());
try {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java
new file mode 100644
index 0000000000..cd2a0172ef
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.view.ViewChange;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import static org.apache.paimon.flink.FlinkCatalog.DIALECT;
+
+/**
+ * alter view procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * -- add dialect in the view
+ * CALL sys.alter_view_dialect('view_identifier', 'add', 'flink', 'query')
+ * CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'add', `query` => 'query')
+ *
+ * -- update dialect in the view
+ * CALL sys.alter_view_dialect('view_identifier', 'update', 'flink', 'query')
+ * CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'update', `query` => 'query')
+ *
+ * -- drop dialect in the view
+ * CALL sys.alter_view_dialect('view_identifier', 'drop', 'flink')
+ * CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'drop')
+ *
+ * </code></pre>
+ */
+public class AlterViewDialectProcedure extends ProcedureBase {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "view", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "action", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "engine", type = @DataTypeHint("STRING"),
isOptional = true),
+ @ArgumentHint(name = "query", type = @DataTypeHint("STRING"),
isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String view,
+ String action,
+ String engine,
+ String query)
+ throws Catalog.ViewNotExistException,
Catalog.DialectAlreadyExistException,
+ Catalog.DialectNotExistException {
+ Identifier identifier = Identifier.fromString(view);
+ ViewChange viewChange;
+ String dialect = StringUtils.isNullOrWhitespaceOnly(engine) ? DIALECT
: engine;
+ switch (action) {
+ case "add":
+ {
+ if (StringUtils.isNullOrWhitespaceOnly(query)) {
+ throw new IllegalArgumentException("query is required
for add action.");
+ }
+ viewChange = ViewChange.addDialect(dialect, query);
+ break;
+ }
+ case "update":
+ {
+ if (StringUtils.isNullOrWhitespaceOnly(query)) {
+ throw new IllegalArgumentException("query is required
for update action.");
+ }
+ viewChange = ViewChange.updateDialect(dialect, query);
+ break;
+ }
+ case "drop":
+ {
+ viewChange = ViewChange.dropDialect(dialect);
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Unsupported action: "
+ action);
+ }
+ }
+ catalog.alterView(identifier, ImmutableList.of(viewChange), false);
+ return new String[] {"Success"};
+ }
+
+ @Override
+ public String identifier() {
+ return "alter_view_dialect";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 184706854c..c6ae6b2595 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -89,3 +89,4 @@
org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
org.apache.paimon.flink.procedure.ClearConsumersProcedure
org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure
org.apache.paimon.flink.procedure.RescaleProcedure
+org.apache.paimon.flink.procedure.AlterViewDialectProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 36efa4817c..3ec704a10c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -19,79 +19,21 @@
package org.apache.paimon.flink;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.rest.RESTCatalogInternalOptions;
-import org.apache.paimon.rest.RESTCatalogOptions;
-import org.apache.paimon.rest.RESTCatalogServer;
-import org.apache.paimon.rest.RESTFileIOTestLoader;
-import org.apache.paimon.rest.RESTTestFileIO;
import org.apache.paimon.rest.RESTToken;
-import org.apache.paimon.rest.RESTTokenFileIO;
-import org.apache.paimon.rest.auth.AuthProvider;
-import org.apache.paimon.rest.auth.AuthProviderEnum;
-import org.apache.paimon.rest.auth.BearTokenAuthProvider;
-import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.flink.types.Row;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
/** ITCase for REST catalog. */
-class RESTCatalogITCase extends CatalogITCaseBase {
-
- private static final String DATABASE_NAME = "mydb";
- private static final String TABLE_NAME = "t1";
-
- private RESTCatalogServer restCatalogServer;
- private String serverUrl;
- private String dataPath;
- private String warehouse;
- @TempDir java.nio.file.Path tempFile;
-
- @BeforeEach
- @Override
- public void before() throws IOException {
- String initToken = "init_token";
- dataPath = tempFile.toUri().toString();
- warehouse = UUID.randomUUID().toString();
- ConfigResponse config =
- new ConfigResponse(
- ImmutableMap.of(
- RESTCatalogInternalOptions.PREFIX.key(),
- "paimon",
- RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
- "true",
- CatalogOptions.WAREHOUSE.key(),
- warehouse),
- ImmutableMap.of());
- AuthProvider authProvider = new BearTokenAuthProvider(initToken);
- restCatalogServer = new RESTCatalogServer(dataPath, authProvider,
config, warehouse);
- restCatalogServer.start();
- serverUrl = restCatalogServer.getUrl();
- super.before();
- sql(String.format("CREATE DATABASE %s", DATABASE_NAME));
- sql(String.format("CREATE TABLE %s.%s (a STRING, b DOUBLE)",
DATABASE_NAME, TABLE_NAME));
- }
-
- @AfterEach()
- public void after() throws IOException {
- sql(String.format("DROP TABLE %s.%s", DATABASE_NAME, TABLE_NAME));
- sql(String.format("DROP DATABASE %s", DATABASE_NAME));
- restCatalogServer.shutdown();
- }
+class RESTCatalogITCase extends RESTCatalogITCaseBase {
@Test
void testCreateTable() {
@@ -160,30 +102,4 @@ class RESTCatalogITCase extends CatalogITCaseBase {
assertThat(batchSql(String.format("SELECT * FROM %s.%s",
DATABASE_NAME, TABLE_NAME)))
.containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2",
22.0D));
}
-
- @Override
- protected Map<String, String> catalogOptions() {
- String initToken = "init_token";
- Map<String, String> options = new HashMap<>();
- options.put("metastore", "rest");
- options.put(CatalogOptions.WAREHOUSE.key(), warehouse);
- options.put(RESTCatalogOptions.URI.key(), serverUrl);
- options.put(RESTCatalogOptions.TOKEN.key(), initToken);
- options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.BEAR.identifier());
- options.put(RESTTokenFileIO.DATA_TOKEN_ENABLED.key(), "true");
- options.put(
- RESTTestFileIO.DATA_PATH_CONF_KEY,
- dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
- return options;
- }
-
- @Override
- protected String getTempDirPath() {
- return this.dataPath;
- }
-
- @Override
- protected boolean supportDefineWarehouse() {
- return false;
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
new file mode 100644
index 0000000000..5c214568ee
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.RESTFileIOTestLoader;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.RESTTokenFileIO;
+import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/** Base ITCase for REST catalog. */
+public abstract class RESTCatalogITCaseBase extends CatalogITCaseBase {
+
+ protected static final String DATABASE_NAME = "mydb";
+ protected static final String TABLE_NAME = "t1";
+
+ protected RESTCatalogServer restCatalogServer;
+
+ private String serverUrl;
+ private String dataPath;
+ private String warehouse;
+ @TempDir java.nio.file.Path tempFile;
+
+ @BeforeEach
+ @Override
+ public void before() throws IOException {
+ String initToken = "init_token";
+ dataPath = tempFile.toUri().toString();
+ warehouse = UUID.randomUUID().toString();
+ ConfigResponse config =
+ new ConfigResponse(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "paimon",
+ RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
+ "true",
+ CatalogOptions.WAREHOUSE.key(),
+ warehouse),
+ ImmutableMap.of());
+ AuthProvider authProvider = new BearTokenAuthProvider(initToken);
+ restCatalogServer = new RESTCatalogServer(dataPath, authProvider,
config, warehouse);
+ restCatalogServer.start();
+ serverUrl = restCatalogServer.getUrl();
+ super.before();
+ sql(String.format("CREATE DATABASE %s", DATABASE_NAME));
+ sql(String.format("CREATE TABLE %s.%s (a STRING, b DOUBLE)",
DATABASE_NAME, TABLE_NAME));
+ }
+
+ @Override
+ protected Map<String, String> catalogOptions() {
+ String initToken = "init_token";
+ Map<String, String> options = new HashMap<>();
+ options.put("metastore", "rest");
+ options.put(CatalogOptions.WAREHOUSE.key(), warehouse);
+ options.put(RESTCatalogOptions.URI.key(), serverUrl);
+ options.put(RESTCatalogOptions.TOKEN.key(), initToken);
+ options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.BEAR.identifier());
+ options.put(RESTTokenFileIO.DATA_TOKEN_ENABLED.key(), "true");
+ options.put(
+ RESTTestFileIO.DATA_PATH_CONF_KEY,
+ dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
+ return options;
+ }
+
+ @Override
+ protected String getTempDirPath() {
+ return this.dataPath;
+ }
+
+ @Override
+ protected boolean supportDefineWarehouse() {
+ return false;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/AlterViewDialectITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/AlterViewDialectITCase.java
new file mode 100644
index 0000000000..dcefde8d9b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/AlterViewDialectITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.FlinkCatalog;
+import org.apache.paimon.flink.RESTCatalogITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link AlterViewDialectProcedure}. */
+public class AlterViewDialectITCase extends RESTCatalogITCaseBase {
+
+ @Test
+ public void testAlterViewDialect() {
+
+ sql(
+ String.format(
+ "INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
+ DATABASE_NAME, TABLE_NAME));
+ String viewName = "view_test";
+ String query =
+ String.format("SELECT * FROM `%s`.`%s` WHERE `b` > 1",
DATABASE_NAME, TABLE_NAME);
+ sql(String.format("CREATE VIEW %s.%s AS %s", DATABASE_NAME, viewName,
query));
+ String newQuery =
+ String.format("SELECT * FROM `%s`.`%s` WHERE `b` > 2",
DATABASE_NAME, TABLE_NAME);
+
+ List<Row> result =
+ sql(
+ String.format(
+ "CALL sys.alter_view_dialect('%s.%s',
'update', '%s', '%s')",
+ DATABASE_NAME, viewName, FlinkCatalog.DIALECT,
newQuery));
+ assertThat(result.toString()).contains("Success");
+ result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME,
viewName));
+ assertThat(result.toString()).contains(newQuery);
+
+ result =
+ sql(
+ String.format(
+ "CALL sys.alter_view_dialect('%s.%s', 'drop',
'%s')",
+ DATABASE_NAME, viewName,
FlinkCatalog.DIALECT));
+ assertThat(result.toString()).contains("Success");
+
+ result =
+ sql(
+ String.format(
+ "CALL sys.alter_view_dialect('%s.%s', 'add',
'%s', '%s')",
+ DATABASE_NAME, viewName, FlinkCatalog.DIALECT,
query));
+ assertThat(result.toString()).contains("Success");
+ result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME,
viewName));
+ assertThat(result.toString()).contains(query);
+
+ sql(
+ String.format(
+ "CALL sys.alter_view_dialect(`view` => '%s.%s',
`action` => 'update', `query` => '%s')",
+ DATABASE_NAME, viewName, newQuery));
+ result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME,
viewName));
+ assertThat(result.toString()).contains(newQuery);
+ sql(
+ String.format(
+ "CALL sys.alter_view_dialect(`view` => '%s.%s',
`action` => 'drop')",
+ DATABASE_NAME, viewName));
+ result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME,
viewName));
+ assertThat(result.toString()).contains("`b` > 1");
+ sql(
+ String.format(
+ "CALL sys.alter_view_dialect(`view` => '%s.%s',
`action` => 'add', `query` => '%s')",
+ DATABASE_NAME, viewName, query));
+ result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME,
viewName));
+ assertThat(result.toString()).contains(query);
+ }
+}
diff --git a/paimon-open-api/rest-catalog-open-api.yaml
b/paimon-open-api/rest-catalog-open-api.yaml
index 44b1a63385..26b9f1234a 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -1309,6 +1309,53 @@ paths:
$ref: '#/components/schemas/ErrorResponse'
"500":
description: Internal Server Error
+ post:
+ tags:
+ - view
+ summary: Alter view
+ operationId: alterView
+ parameters:
+ - name: prefix
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: database
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: view
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AlterViewRequest'
+ responses:
+ "200":
+ description: Success, no content
+ "401":
+ description: Unauthorized
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
+ "404":
+ description: Resource not found
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
+ "500":
+ description: Internal Server Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
delete:
tags:
- view
@@ -1521,7 +1568,7 @@ components:
type: string
resourceType:
type: string
- enum: ["DATABASE", "TABLE", "COLUMN", "SNAPSHOT", "BRANCH", "TAG",
"VIEW", "UNKNOWN"]
+ enum: ["DATABASE", "TABLE", "COLUMN", "SNAPSHOT", "BRANCH", "TAG",
"VIEW", "DIALECT", "UNKNOWN"]
resourceName:
type: string
code:
@@ -1541,6 +1588,80 @@ components:
$ref: '#/components/schemas/Identifier'
schema:
$ref: '#/components/schemas/ViewSchema'
+ AlterViewRequest:
+ type: object
+ properties:
+ changes:
+ type: array
+ items:
+ $ref: '#/components/schemas/ViewChange'
+ ViewChange:
+ anyOf:
+ - $ref: '#/components/schemas/SetViewOption'
+ - $ref: '#/components/schemas/RemoveViewOption'
+ - $ref: '#/components/schemas/UpdateViewComment'
+ - $ref: '#/components/schemas/AddDialect'
+ - $ref: '#/components/schemas/UpdateDialect'
+ - $ref: '#/components/schemas/DropDialect'
+ required:
+ - action
+ properties:
+ action:
+ type: string
+ SetViewOption:
+ type: object
+ properties:
+ action:
+ type: string
+ const: "setOption"
+ key:
+ type: string
+ value:
+ type: string
+ RemoveViewOption:
+ type: object
+ properties:
+ action:
+ type: string
+ const: "removeOption"
+ key:
+ type: string
+ UpdateViewComment:
+ type: object
+ properties:
+ action:
+ type: string
+ const: "comment"
+ key:
+ type: string
+ AddDialect:
+ type: object
+ properties:
+ action:
+ type: string
+ const: "addDialect"
+ dialect:
+ type: string
+ query:
+ type: string
+ UpdateDialect:
+ type: object
+ properties:
+ action:
+ type: string
+ const: "updateDialect"
+ dialect:
+ type: string
+ query:
+ type: string
+ DropDialect:
+ type: object
+ properties:
+ action:
+ type: string
+ const: "dropDialect"
+ dialect:
+ type: string
DataField:
type: object
properties:
@@ -1668,12 +1789,17 @@ components:
- $ref: '#/components/schemas/UpdateColumnType'
- $ref: '#/components/schemas/UpdateColumnPosition'
- $ref: '#/components/schemas/UpdateColumnNullability'
+ required:
+ - action
+ properties:
+ action:
+ type: string
SetOption:
type: object
properties:
action:
type: string
- enum: ["setOption"]
+ const: "setOption"
key:
type: string
value:
@@ -1683,7 +1809,7 @@ components:
properties:
action:
type: string
- enum: ["removeOption"]
+ const: "removeOption"
key:
type: string
UpdateComment:
@@ -1691,7 +1817,7 @@ components:
properties:
action:
type: string
- enum: ["updateComment"]
+ const: "updateComment"
comment:
type: string
AddColumn:
@@ -1699,7 +1825,7 @@ components:
properties:
action:
type: string
- enum: ["addColumn"]
+ const: "addColumn"
fieldNames:
type: array
items:
@@ -1715,7 +1841,7 @@ components:
properties:
action:
type: string
- enum: ["renameColumn"]
+ const: "renameColumn"
fieldNames:
type: array
items:
@@ -1727,7 +1853,7 @@ components:
properties:
action:
type: string
- enum: ["dropColumn"]
+ const: "dropColumn"
fieldNames:
type: array
items:
@@ -1737,7 +1863,7 @@ components:
properties:
action:
type: string
- enum: [ "updateColumnComment" ]
+ const: "updateColumnComment"
fieldNames:
type: array
items:
@@ -1749,7 +1875,7 @@ components:
properties:
action:
type: string
- enum: [ "updateColumnType" ]
+ const: "updateColumnType"
fieldNames:
type: array
items:
@@ -1763,7 +1889,7 @@ components:
properties:
action:
type: string
- enum: [ "updateColumnPosition" ]
+ const: "updateColumnPosition"
move:
$ref: '#/components/schemas/Move'
UpdateColumnNullability:
@@ -1771,7 +1897,7 @@ components:
properties:
action:
type: string
- enum: [ "update_column_nullability" ]
+ const: "update_column_nullability"
fieldNames:
type: array
items:
@@ -1812,12 +1938,17 @@ components:
anyOf:
- $ref: '#/components/schemas/SnapshotInstant'
- $ref: '#/components/schemas/TagInstant'
+ required:
+ - type
+ properties:
+ type:
+ type: string
SnapshotInstant:
type: object
properties:
'type':
type: string
- enum: [ "snapshot" ]
+ const: "snapshot"
snapshotId:
type: integer
format: int64
@@ -1826,7 +1957,7 @@ components:
properties:
'type':
type: string
- enum: [ "tag" ]
+ const: "tag"
tagName:
type: string
Snapshot:
diff --git
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index 307a088ad4..1fd2ac2fbf 100644
---
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -21,6 +21,7 @@ package org.apache.paimon.open.api;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.AlterViewRequest;
import org.apache.paimon.rest.requests.CommitTableRequest;
import org.apache.paimon.rest.requests.CreateBranchRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
@@ -896,4 +897,32 @@ public class RESTCatalogController {
@PathVariable String prefix,
@PathVariable String database,
@PathVariable String view) {}
+
+ @Operation(
+ summary = "Alter view",
+ tags = {"view"})
+ @ApiResponses({
+ @ApiResponse(responseCode = "200", description = "Success, no
content"),
+ @ApiResponse(
+ responseCode = "401",
+ description = "Unauthorized",
+ content = {@Content(schema = @Schema(implementation =
ErrorResponse.class))}),
+ @ApiResponse(
+ responseCode = "404",
+ description = "Resource not found",
+ content = {@Content(schema = @Schema(implementation =
ErrorResponse.class))}),
+ @ApiResponse(
+ responseCode = "409",
+ description = "Resource has exist",
+ content = {@Content(schema = @Schema(implementation =
ErrorResponse.class))}),
+ @ApiResponse(
+ responseCode = "500",
+ content = {@Content(schema = @Schema(implementation =
ErrorResponse.class))})
+ })
+ @PostMapping("/v1/{prefix}/databases/{database}/views/{view}")
+ public void alterView(
+ @PathVariable String prefix,
+ @PathVariable String database,
+ @PathVariable String view,
+ @RequestBody AlterViewRequest request) {}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index e1cf70d3bd..53ae3758d5 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark;
+import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
@@ -100,6 +101,7 @@ public class SparkProcedures {
procedureBuilders.put("compact_manifest",
CompactManifestProcedure::builder);
procedureBuilders.put("refresh_object_table",
RefreshObjectTableProcedure::builder);
procedureBuilders.put("clear_consumers",
ClearConsumersProcedure::builder);
+ procedureBuilders.put("alter_view_dialect",
AlterViewDialectProcedure::builder);
return procedureBuilders.build();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
index 7b4e7dd57e..567b085691 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
@@ -37,6 +37,8 @@ import static
org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
/** Catalog methods for working with Views. */
public interface SupportView extends WithPaimonCatalog {
+ String DIALECT = "spark";
+
default List<String> listViews(String[] namespace) throws
NoSuchNamespaceException {
try {
checkNamespace(namespace);
@@ -67,7 +69,7 @@ public interface SupportView extends WithPaimonCatalog {
paimonIdent,
toPaimonRowType(schema).getFields(),
queryText,
- Collections.singletonMap("spark",
queryText),
+ Collections.singletonMap(DIALECT,
queryText),
comment,
properties),
ignoreIfExists);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java
new file mode 100644
index 0000000000..029ecb29e6
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.spark.catalog.SupportView;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.utils.CatalogUtils;
+import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.view.ViewChange;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * alter view dialect procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * -- add dialect in the view
+ * CALL sys.alter_view_dialect('view_identifier', 'add', 'spark', 'query')
+ * CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'add', `query` => 'query')
+ *
+ * -- update dialect in the view
+ * CALL sys.alter_view_dialect('view_identifier', 'update', 'spark', 'query')
+ * CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'update', `query` => 'query')
+ *
+ * -- drop dialect in the view
+ * CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark')
+ * CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'drop')
+ *
+ * </code></pre>
+ */
+public class AlterViewDialectProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("view", StringType),
+ ProcedureParameter.required("action", StringType),
+ ProcedureParameter.optional("engine", StringType),
+ ProcedureParameter.optional("query", StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ protected AlterViewDialectProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Catalog paimonCatalog = ((WithPaimonCatalog)
tableCatalog()).paimonCatalog();
+ org.apache.spark.sql.connector.catalog.Identifier ident =
+ toIdentifier(args.getString(0), PARAMETERS[0].name());
+ Identifier view = CatalogUtils.toIdentifier(ident);
+ ViewChange viewChange;
+ String dialect =
+ ((GenericInternalRow) args).genericGet(2) == null
+ ||
StringUtils.isNullOrWhitespaceOnly(args.getString(2))
+ ? SupportView.DIALECT
+ : args.getString(2);
+ String query = ((GenericInternalRow) args).genericGet(3) == null ?
null : args.getString(3);
+ switch (args.getString(1)) {
+ case "add":
+ {
+ if (StringUtils.isNullOrWhitespaceOnly(query)) {
+ throw new IllegalArgumentException("query is required
for add action.");
+ }
+ viewChange = ViewChange.addDialect(dialect, query);
+ break;
+ }
+ case "update":
+ {
+ if (StringUtils.isNullOrWhitespaceOnly(query)) {
+ throw new IllegalArgumentException("query is required
for update action.");
+ }
+ viewChange = ViewChange.updateDialect(dialect, query);
+ break;
+ }
+ case "drop":
+ {
+ viewChange = ViewChange.dropDialect(dialect);
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Unsupported action: "
+ args.getString(1));
+ }
+ }
+ try {
+ paimonCatalog.alterView(view, ImmutableList.of(viewChange), false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return new InternalRow[] {newInternalRow(true)};
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<AlterViewDialectProcedure>() {
+ @Override
+ public AlterViewDialectProcedure doBuild() {
+ return new AlterViewDialectProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "AlterViewDialectProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
index 4f9ee6ec24..45de6a1c76 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
@@ -59,7 +59,8 @@ case class PaimonViewResolver(spark: SparkSession)
}
private def createViewRelation(nameParts: Seq[String], view: View):
LogicalPlan = {
- val parsedPlan = parseViewText(nameParts.toArray.mkString("."),
view.query("spark"))
+ val parsedPlan =
+ parseViewText(nameParts.toArray.mkString("."),
view.query(SupportView.DIALECT))
val aliases =
SparkTypeUtils.fromPaimonRowType(view.rowType()).fields.zipWithIndex.map {
case (expected, pos) =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
index 941fa24bad..6e807d63ea 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
@@ -23,7 +23,6 @@ import
org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.paimon.view.View
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute,
GenericInternalRow}
import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString,
quoteIfNeeded, StringUtils}
import org.apache.spark.sql.connector.catalog.Identifier
@@ -129,7 +128,7 @@ case class ShowCreatePaimonViewExec(output: Seq[Attribute],
catalog: SupportView
showDataColumns(view, builder)
showComment(view, builder)
showProperties(view, builder)
- builder ++= s"AS\n${view.query("spark")}\n"
+ builder ++= s"AS\n${view.query(SupportView.DIALECT)}\n"
Seq(new GenericInternalRow(values =
Array(UTF8String.fromString(builder.toString))))
}
@@ -203,7 +202,7 @@ case class DescribePaimonViewExec(
rows += row("# Detailed View Information", "", "")
rows += row("Name", view.fullName(), "")
rows += row("Comment", view.comment().orElse(""), "")
- rows += row("View Text", view.query("spark"), "")
+ rows += row("View Text", view.query(SupportView.DIALECT), "")
rows += row(
"View Query Output Columns",
view.rowType().getFieldNames.asScala.mkString("[", ", ", "]"),
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
new file mode 100644
index 0000000000..124a089031
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.options.CatalogOptions
+import org.apache.paimon.rest.{RESTCatalogFactory, RESTCatalogInternalOptions,
RESTCatalogServer}
+import org.apache.paimon.rest.auth.{AuthProviderEnum, BearTokenAuthProvider}
+import org.apache.paimon.rest.responses.ConfigResponse
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util.UUID
+
+class PaimonRestCatalogSparkTestBase extends PaimonSparkTestBase {
+
+ private var restCatalogServer: RESTCatalogServer = null
+ private var serverUrl: String = null
+ private var warehouse: String = null
+ private val initToken = "init_token"
+
+ override protected def beforeAll(): Unit = {
+ warehouse = UUID.randomUUID.toString
+ val config = new ConfigResponse(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key,
+ "paimon",
+ CatalogOptions.WAREHOUSE.key,
+ warehouse),
+ ImmutableMap.of())
+ val authProvider = new BearTokenAuthProvider(initToken)
+ restCatalogServer =
+ new RESTCatalogServer(tempDBDir.getCanonicalPath, authProvider, config,
warehouse)
+ restCatalogServer.start()
+ serverUrl = restCatalogServer.getUrl
+ super.beforeAll()
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ } finally {
+ restCatalogServer.shutdown()
+ }
+ }
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.sql.catalog.paimon.metastore", RESTCatalogFactory.IDENTIFIER)
+ .set("spark.sql.catalog.paimon.uri", serverUrl)
+ .set("spark.sql.catalog.paimon.token", initToken)
+ .set("spark.sql.catalog.paimon.warehouse", warehouse)
+ .set("spark.sql.catalog.paimon.token.provider",
AuthProviderEnum.BEAR.identifier)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
new file mode 100644
index 0000000000..1db9410a46
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonRestCatalogSparkTestBase
+import org.apache.paimon.spark.catalog.SupportView
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+class AlterViewDialectProcedureTest extends PaimonRestCatalogSparkTestBase {
+ test(s"test alter view dialect procedure") {
+ val viewName = "view_test"
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING)
+ |""".stripMargin)
+
+ val query = "SELECT * FROM T WHERE `id` > 1";
+ spark.sql(s"""
+ |CREATE VIEW $viewName as $query
+ |""".stripMargin)
+ val checkViewQuery = (view: String, query: String) =>
+ Assertions
+ .assertThat(
+ spark
+ .sql(s"desc extended $view")
+ .filter("col_name = 'View Text'")
+ .select("data_type")
+ .collect()(0)(0))
+ .isEqualTo(query)
+
+ checkViewQuery(viewName, query)
+
+ checkAnswer(
+ spark.sql(s"CALL sys.alter_view_dialect('$viewName', 'drop',
'${SupportView.DIALECT}')"),
+ Row(true))
+
+ checkAnswer(
+ spark.sql(
+ s"CALL sys.alter_view_dialect('$viewName', 'add',
'${SupportView.DIALECT}', '$query')"),
+ Row(true))
+
+ checkViewQuery(viewName, query)
+
+ val newQuery = "SELECT * FROM T WHERE `id` > 2";
+
+ checkAnswer(
+ spark.sql(
+ s"CALL sys.alter_view_dialect('$viewName', 'update',
'${SupportView.DIALECT}', '$newQuery')"),
+ Row(true))
+
+ checkViewQuery(viewName, newQuery)
+
+ checkAnswer(
+ spark.sql(s"CALL sys.alter_view_dialect(`view` => '$viewName', `action`
=> 'drop')"),
+ Row(true))
+
+ checkAnswer(
+ spark.sql(
+ s"CALL sys.alter_view_dialect(`view` => '$viewName', `action` =>
'add', `query` => '$query')"),
+ Row(true))
+ checkViewQuery(viewName, query)
+
+ checkAnswer(
+ spark.sql(
+ s"CALL sys.alter_view_dialect(`view` => '$viewName', `action` =>
'update', `query` => '$newQuery')"),
+ Row(true))
+ checkViewQuery(viewName, newQuery)
+ }
+}