This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 96e4bbf239 [flink] by default, use paimon catalog to create views in
flink generic catalog (#5611)
96e4bbf239 is described below
commit 96e4bbf239b1f0f1e0fcc8907e1b00429aa1cf17
Author: Kerwin <[email protected]>
AuthorDate: Mon May 19 11:54:37 2025 +0800
[flink] by default, use paimon catalog to create views in flink generic
catalog (#5611)
---
.../java/org/apache/paimon/flink/FlinkGenericCatalog.java | 10 ++++++++++
.../org/apache/paimon/hive/FlinkGenericCatalogITCase.java | 11 +++++++++++
2 files changed, 21 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
index 75af5917bb..5f7a05d885 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -179,6 +180,15 @@ public class FlinkGenericCatalog extends AbstractCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ // create view
+ // TODO: By default, use the Paimon catalog to create the view. Create
views with options is
+ // not supported.
+ if (table instanceof CatalogView) {
+ paimon.createTable(tablePath, table, ignoreIfExists);
+ return;
+ }
+
+ // create table
String connector = table.getOptions().get(CONNECTOR.key());
if (connector == null) {
throw new RuntimeException(
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
index b82c5e579c..30d4953224 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
@@ -219,4 +219,15 @@ public class FlinkGenericCatalogITCase extends
AbstractTestBaseJUnit4 {
List<Row> result = sql("SELECT tag_name FROM paimon_t$tags");
assertThat(result).contains(Row.of("tag_1"));
}
+
+ @Test
+ public void testCreateView() {
+ sql("CREATE TABLE paimon_t ( " + "f0 INT, " + "f1 INT " + ") WITH
('connector'='paimon')");
+ sql("INSERT INTO paimon_t VALUES (1, 1), (2, 2)");
+ assertThat(sql("SELECT * FROM paimon_t"))
+ .containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 2));
+ sql("CREATE VIEW paimon_t_view AS SELECT * FROM paimon_t WHERE f0=1");
+
+ assertThat(sql("SELECT * FROM
paimon_t_view")).containsExactlyInAnyOrder(Row.of(1, 1));
+ }
}