This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 84f0632b15c2a192aa22a525c7b4937f80f20a34
Author: fengli <ldliu...@163.com>
AuthorDate: Tue Apr 30 16:23:25 2024 +0800

    [FLINK-35195][test/test-filesystem] test-filesystem Catalog support create 
generic table
---
 .../file/testutils/TestFileSystemTableFactory.java | 35 +++++++++-
 .../testutils/catalog/TestFileSystemCatalog.java   | 26 +++++--
 .../catalog/TestFileSystemCatalogITCase.java       | 79 +++++++++++++++++++++-
 .../catalog/TestFileSystemCatalogTest.java         | 38 +++++++++++
 4 files changed, 170 insertions(+), 8 deletions(-)

diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
index aa5cd5e17bb..c7009af581c 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
@@ -22,10 +22,14 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.file.table.FileSystemTableFactory;
 import org.apache.flink.connector.file.table.TestFileSystemTableSource;
 import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;
+
+import java.util.Collections;
 
 /** Test filesystem {@link Factory}. */
 @Internal
@@ -40,9 +44,21 @@ public class TestFileSystemTableFactory extends 
FileSystemTableFactory {
 
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
+        final boolean isFileSystemTable =
+                
TestFileSystemCatalog.isFileSystemTable(context.getCatalogTable().getOptions());
+        if (!isFileSystemTable) {
+            return FactoryUtil.createDynamicTableSource(
+                    null,
+                    context.getObjectIdentifier(),
+                    context.getCatalogTable(),
+                    Collections.emptyMap(),
+                    context.getConfiguration(),
+                    context.getClassLoader(),
+                    context.isTemporary());
+        }
+
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
-
         return new TestFileSystemTableSource(
                 context.getObjectIdentifier(),
                 context.getPhysicalRowDataType(),
@@ -51,4 +67,21 @@ public class TestFileSystemTableFactory extends 
FileSystemTableFactory {
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, 
DeserializationFormatFactory.class));
     }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        final boolean isFileSystemTable =
+                
TestFileSystemCatalog.isFileSystemTable(context.getCatalogTable().getOptions());
+        if (!isFileSystemTable) {
+            return FactoryUtil.createDynamicTableSink(
+                    null,
+                    context.getObjectIdentifier(),
+                    context.getCatalogTable(),
+                    Collections.emptyMap(),
+                    context.getConfiguration(),
+                    context.getClassLoader(),
+                    context.isTemporary());
+        }
+        return super.createDynamicTableSink(context);
+    }
 }
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
index 490dd29d608..6d64ecee032 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.file.testutils.catalog;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
@@ -78,6 +79,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static 
org.apache.flink.table.file.testutils.TestFileSystemTableFactory.IDENTIFIER;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** A catalog implementation for test {@link FileSystem}. */
@@ -276,6 +279,13 @@ public class TestFileSystemCatalog extends AbstractCatalog 
{
         }
     }
 
+    @Internal
+    public static boolean isFileSystemTable(Map<String, String> properties) {
+        String connector = properties.get(CONNECTOR.key());
+        return StringUtils.isNullOrWhitespaceOnly(connector)
+                || IDENTIFIER.equalsIgnoreCase(connector);
+    }
+
     @Override
     public boolean tableExists(ObjectPath tablePath) throws CatalogException {
         Path path = inferTablePath(catalogPathStr, tablePath);
@@ -346,7 +356,9 @@ public class TestFileSystemCatalog extends AbstractCatalog {
             if (!fs.exists(path)) {
                 fs.mkdirs(path);
                 fs.mkdirs(tableSchemaPath);
-                fs.mkdirs(tableDataPath);
+                if (isFileSystemTable(catalogTable.getOptions())) {
+                    fs.mkdirs(tableDataPath);
+                }
             }
 
             // write table schema
@@ -649,16 +661,20 @@ public class TestFileSystemCatalog extends 
AbstractCatalog {
             String tableDataPath) {
         if (CatalogBaseTable.TableKind.TABLE == tableKind) {
             CatalogTable catalogTable = 
CatalogPropertiesUtil.deserializeCatalogTable(properties);
-            // put table data path
             Map<String, String> options = new 
HashMap<>(catalogTable.getOptions());
-            options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath);
+            if (isFileSystemTable(catalogTable.getOptions())) {
+                // put table data path
+                options.put(FileSystemConnectorOptions.PATH.key(), 
tableDataPath);
+            }
             return catalogTable.copy(options);
         } else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind) 
{
             CatalogMaterializedTable catalogMaterializedTable =
                     
CatalogPropertiesUtil.deserializeCatalogMaterializedTable(properties);
-            // put table data path
             Map<String, String> options = new 
HashMap<>(catalogMaterializedTable.getOptions());
-            options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath);
+            if (isFileSystemTable(catalogMaterializedTable.getOptions())) {
+                // put table data path
+                options.put(FileSystemConnectorOptions.PATH.key(), 
tableDataPath);
+            }
             return catalogMaterializedTable.copy(options);
         }
 
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java
index e418f6c80a1..4058853ae04 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java
@@ -20,11 +20,13 @@ package org.apache.flink.table.file.testutils.catalog;
 
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -44,7 +46,6 @@ public class TestFileSystemCatalogITCase extends 
TestFileSystemCatalogTestBase {
                         isStreamingMode
                                 ? EnvironmentSettings.inStreamingMode()
                                 : EnvironmentSettings.inBatchMode());
-
         tEnv.registerCatalog(TEST_CATALOG, catalog);
         tEnv.useCatalog(TEST_CATALOG);
 
@@ -84,6 +85,80 @@ public class TestFileSystemCatalogITCase extends 
TestFileSystemCatalogTestBase {
                         Row.of(1003L, "user3", "ciao", "2021-06-10 10:02:00"),
                         Row.of(1004L, "user4", "你好", "2021-06-10 10:03:00"));
 
-        assertThat(expected).isEqualTo((Lists.newArrayList(result)));
+        assertThat(Lists.newArrayList(result)).isEqualTo(expected);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testReadDatagenTable(boolean isStreamingMode) {
+        TableEnvironment tEnv =
+                TableEnvironment.create(
+                        isStreamingMode
+                                ? EnvironmentSettings.inStreamingMode()
+                                : EnvironmentSettings.inBatchMode());
+        tEnv.registerCatalog(TEST_CATALOG, catalog);
+        tEnv.useCatalog(TEST_CATALOG);
+
+        tEnv.executeSql(
+                "CREATE TABLE datagenSource (\n"
+                        + "  id BIGINT,\n"
+                        + "  user_name STRING,\n"
+                        + "  message STRING,\n"
+                        + "  log_ts STRING\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'datagen',\n"
+                        + "  'number-of-rows' = '10'\n"
+                        + ")");
+
+        tEnv.getConfig().getConfiguration().setString("parallelism.default", 
"1");
+        CloseableIterator<Row> result =
+                tEnv.executeSql(
+                                String.format(
+                                        "SELECT * FROM %s.%s.datagenSource",
+                                        TEST_CATALOG, TEST_DEFAULT_DATABASE))
+                        .collect();
+
+        // assert query result size
+        assertThat(Lists.newArrayList(result).size()).isEqualTo(10);
+    }
+
+    @Test
+    public void testWriteTestValuesSinkTable() throws Exception {
+        TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().build());
+        tEnv.registerCatalog(TEST_CATALOG, catalog);
+        tEnv.useCatalog(TEST_CATALOG);
+
+        tEnv.executeSql(
+                "CREATE TABLE valueSink (\n"
+                        + "  id BIGINT,\n"
+                        + "  user_name STRING,\n"
+                        + "  message STRING,\n"
+                        + "  log_ts STRING\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values'\n"
+                        + ")");
+
+        tEnv.getConfig().getConfiguration().setString("parallelism.default", 
"1");
+        tEnv.getConfig().getConfiguration().setString("parallelism.default", 
"1");
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s.%s.valueSink VALUES\n"
+                                        + "(1001, 'user1', 'hello world', 
'2021-06-10 10:00:00'),\n"
+                                        + "(1002, 'user2', 'hi', '2021-06-10 
10:01:00'),\n"
+                                        + "(1003, 'user3', 'ciao', '2021-06-10 
10:02:00'),\n"
+                                        + "(1004, 'user4', '你好', '2021-06-10 
10:03:00')",
+                                TEST_CATALOG, TEST_DEFAULT_DATABASE))
+                .await();
+
+        // assert query result size
+        List<Row> expected =
+                Arrays.asList(
+                        Row.of(1001L, "user1", "hello world", "2021-06-10 
10:00:00"),
+                        Row.of(1002L, "user2", "hi", "2021-06-10 10:01:00"),
+                        Row.of(1003L, "user3", "ciao", "2021-06-10 10:02:00"),
+                        Row.of(1004L, "user4", "你好", "2021-06-10 10:03:00"));
+
+        List<Row> actual = TestValuesTableFactory.getRawResults("valueSink");
+        assertThat(actual).isEqualTo(expected);
     }
 }
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
index 07b11583d4f..7d4d9ae1afb 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -256,6 +256,44 @@ public class TestFileSystemCatalogTest extends 
TestFileSystemCatalogTestBase {
                 () -> catalog.createTable(tablePath, 
EXPECTED_CATALOG_MATERIALIZED_TABLE, false));
     }
 
+    @Test
+    public void testCreateAndGetGenericTable() throws Exception {
+        ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+        // test create datagen table
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "datagen");
+        options.put("number-of-rows", "10");
+        ResolvedCatalogTable datagenResolvedTable =
+                new ResolvedCatalogTable(
+                        CatalogTable.newBuilder()
+                                .schema(CREATE_SCHEMA)
+                                .comment("test generic table")
+                                .options(options)
+                                .build(),
+                        CREATE_RESOLVED_SCHEMA);
+
+        catalog.createTable(tablePath, datagenResolvedTable, true);
+
+        // test table exist
+        assertThat(catalog.tableExists(tablePath)).isTrue();
+
+        // test get table
+        CatalogBaseTable actualTable = catalog.getTable(tablePath);
+
+        // validate table type
+        
assertThat(actualTable.getTableKind()).isEqualTo(CatalogBaseTable.TableKind.TABLE);
+        // validate schema
+        assertThat(actualTable.getUnresolvedSchema().resolve(new 
TestSchemaResolver()))
+                .isEqualTo(CREATE_RESOLVED_SCHEMA);
+        // validate options
+        assertThat(actualTable.getOptions()).isEqualTo(options);
+
+        // test create exist table
+        assertThrows(
+                TableAlreadyExistException.class,
+                () -> catalog.createTable(tablePath, datagenResolvedTable, 
false));
+    }
+
     @Test
     public void testListTable() throws Exception {
         ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");

Reply via email to