This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c91e35c9ee [flink] FlinkCatalog#listTables returns both tables and 
views (#5910)
c91e35c9ee is described below

commit c91e35c9eee59aeb23734383b83c8753f72fc5a5
Author: Xuannan <[email protected]>
AuthorDate: Wed Jul 23 10:07:51 2025 +0800

    [flink] FlinkCatalog#listTables returns both tables and views (#5910)
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java |   5 +-
 .../paimon/flink/FlinkRestCatalogITCase.java       | 142 +++++++++++++++++++++
 .../apache/paimon/flink/RESTCatalogITCaseBase.java |   6 +-
 3 files changed, 149 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index e7275a9470..f4a1142031 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -283,7 +283,10 @@ public class FlinkCatalog extends AbstractCatalog {
     public List<String> listTables(String databaseName)
             throws DatabaseNotExistException, CatalogException {
         try {
-            return catalog.listTables(databaseName);
+            List<String> tablesAndViews = new ArrayList<>();
+            tablesAndViews.addAll(catalog.listViews(databaseName));
+            tablesAndViews.addAll(catalog.listTables(databaseName));
+            return tablesAndViews;
         } catch (Catalog.DatabaseNotExistException e) {
             throw new DatabaseNotExistException(getName(), e.database());
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRestCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRestCatalogITCase.java
new file mode 100644
index 0000000000..be92ce163d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRestCatalogITCase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+/** Test for {@link FlinkCatalog} with Rest metastore. */
+public class FlinkRestCatalogITCase extends RESTCatalogITCaseBase {
+
+    private FlinkCatalog catalog;
+
+    @BeforeEach
+    @Override
+    public void before() throws IOException {
+        super.before();
+        initFlinkCatalog();
+    }
+
+    @Test
+    void testListTableReturnsView() throws Exception {
+        catalog.createDatabase("test", null, true);
+        catalog.createTable(
+                new ObjectPath("test", "t1"), 
createTable(Collections.emptyMap()), true);
+        assertThat(catalog.listTables("test")).containsExactly("t1");
+        catalog.createTable(new ObjectPath("test", "v"), createView(), true);
+        assertThat(catalog.listTables("test")).containsExactlyInAnyOrder("t1", 
"v");
+    }
+
+    private CatalogTable createTable(Map<String, String> options) {
+        ResolvedSchema resolvedSchema = this.createSchema();
+        CatalogTable origin =
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("test comment")
+                        .partitionKeys(Collections.emptyList())
+                        .options(options)
+                        .build();
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    private CatalogView createView() {
+        ResolvedSchema resolvedSchema = this.createSchema();
+
+        String query = "SELECT * FROM test.t1";
+        CatalogView view =
+                CatalogView.of(
+                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        null,
+                        query,
+                        query,
+                        Collections.emptyMap());
+        return new ResolvedCatalogView(view, resolvedSchema);
+    }
+
+    private ResolvedSchema createSchema() {
+        return new ResolvedSchema(
+                Arrays.asList(
+                        Column.physical("first", 
org.apache.flink.table.api.DataTypes.STRING()),
+                        Column.physical("second", 
org.apache.flink.table.api.DataTypes.INT()),
+                        Column.physical("third", 
org.apache.flink.table.api.DataTypes.STRING()),
+                        Column.physical(
+                                "four",
+                                org.apache.flink.table.api.DataTypes.ROW(
+                                        
org.apache.flink.table.api.DataTypes.FIELD(
+                                                "f1",
+                                                
org.apache.flink.table.api.DataTypes.STRING()),
+                                        
org.apache.flink.table.api.DataTypes.FIELD(
+                                                "f2", 
org.apache.flink.table.api.DataTypes.INT()),
+                                        
org.apache.flink.table.api.DataTypes.FIELD(
+                                                "f3",
+                                                
org.apache.flink.table.api.DataTypes.MAP(
+                                                        
org.apache.flink.table.api.DataTypes
+                                                                .STRING(),
+                                                        DataTypes.INT()))))),
+                Collections.emptyList(),
+                null);
+    }
+
+    private void initFlinkCatalog() {
+        Map<String, String> authMap =
+                ImmutableMap.of(
+                        RESTCatalogOptions.TOKEN.key(),
+                        RESTCatalogITCaseBase.INIT_TOKEN,
+                        RESTCatalogOptions.TOKEN_PROVIDER.key(),
+                        AuthProviderEnum.BEAR.identifier());
+        Options options = new Options();
+        for (Map.Entry<String, String> entry : authMap.entrySet()) {
+            options.set(entry.getKey(), entry.getValue());
+        }
+        options.set(CatalogOptions.METASTORE, "rest");
+        options.set(CatalogOptions.WAREHOUSE.key(), warehouse);
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        options.set(RESTTestFileIO.DATA_PATH_CONF_KEY, path);
+        catalog =
+                FlinkCatalogFactory.createCatalog(
+                        "c",
+                        CatalogContext.create(options),
+                        FlinkRestCatalogITCase.class.getClassLoader());
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
index 5c214568ee..7da91520dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
@@ -45,18 +45,18 @@ public abstract class RESTCatalogITCaseBase extends 
CatalogITCaseBase {
 
     protected static final String DATABASE_NAME = "mydb";
     protected static final String TABLE_NAME = "t1";
+    protected static final String INIT_TOKEN = "init_token";
 
     protected RESTCatalogServer restCatalogServer;
 
     private String serverUrl;
     private String dataPath;
-    private String warehouse;
+    protected String warehouse;
     @TempDir java.nio.file.Path tempFile;
 
     @BeforeEach
     @Override
     public void before() throws IOException {
-        String initToken = "init_token";
         dataPath = tempFile.toUri().toString();
         warehouse = UUID.randomUUID().toString();
         ConfigResponse config =
@@ -69,7 +69,7 @@ public abstract class RESTCatalogITCaseBase extends 
CatalogITCaseBase {
                                 CatalogOptions.WAREHOUSE.key(),
                                 warehouse),
                         ImmutableMap.of());
-        AuthProvider authProvider = new BearTokenAuthProvider(initToken);
+        AuthProvider authProvider = new BearTokenAuthProvider(INIT_TOKEN);
         restCatalogServer = new RESTCatalogServer(dataPath, authProvider, 
config, warehouse);
         restCatalogServer.start();
         serverUrl = restCatalogServer.getUrl();

Reply via email to