This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 84f0632b15c2a192aa22a525c7b4937f80f20a34 Author: fengli <ldliu...@163.com> AuthorDate: Tue Apr 30 16:23:25 2024 +0800 [FLINK-35195][test/test-filesystem] test-filesystem Catalog support create generic table --- .../file/testutils/TestFileSystemTableFactory.java | 35 +++++++++- .../testutils/catalog/TestFileSystemCatalog.java | 26 +++++-- .../catalog/TestFileSystemCatalogITCase.java | 79 +++++++++++++++++++++- .../catalog/TestFileSystemCatalogTest.java | 38 +++++++++++ 4 files changed, 170 insertions(+), 8 deletions(-) diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java index aa5cd5e17bb..c7009af581c 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java @@ -22,10 +22,14 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.file.table.FileSystemTableFactory; import org.apache.flink.connector.file.table.TestFileSystemTableSource; import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory; +import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog; + +import java.util.Collections; /** Test filesystem {@link Factory}. */ @Internal @@ -40,9 +44,21 @@ public class TestFileSystemTableFactory extends FileSystemTableFactory { @Override public DynamicTableSource createDynamicTableSource(Context context) { + final boolean isFileSystemTable = + TestFileSystemCatalog.isFileSystemTable(context.getCatalogTable().getOptions()); + if (!isFileSystemTable) { + return FactoryUtil.createDynamicTableSource( + null, + context.getObjectIdentifier(), + context.getCatalogTable(), + Collections.emptyMap(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); + } + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); validate(helper); - return new TestFileSystemTableSource( context.getObjectIdentifier(), context.getPhysicalRowDataType(), @@ -51,4 +67,21 @@ public class TestFileSystemTableFactory extends FileSystemTableFactory { discoverDecodingFormat(context, BulkReaderFormatFactory.class), discoverDecodingFormat(context, DeserializationFormatFactory.class)); } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final boolean isFileSystemTable = + TestFileSystemCatalog.isFileSystemTable(context.getCatalogTable().getOptions()); + if (!isFileSystemTable) { + return FactoryUtil.createDynamicTableSink( + null, + context.getObjectIdentifier(), + context.getCatalogTable(), + Collections.emptyMap(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); + } + return super.createDynamicTableSink(context); + } } diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java index 490dd29d608..6d64ecee032 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java @@ -18,6 +18,7 @@ package org.apache.flink.table.file.testutils.catalog; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.connector.file.table.FileSystemConnectorOptions; @@ -78,6 +79,8 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.table.file.testutils.TestFileSystemTableFactory.IDENTIFIER; import static org.apache.flink.util.Preconditions.checkArgument; /** A catalog implementation for test {@link FileSystem}. */ @@ -276,6 +279,13 @@ public class TestFileSystemCatalog extends AbstractCatalog { } } + @Internal + public static boolean isFileSystemTable(Map<String, String> properties) { + String connector = properties.get(CONNECTOR.key()); + return StringUtils.isNullOrWhitespaceOnly(connector) + || IDENTIFIER.equalsIgnoreCase(connector); + } + @Override public boolean tableExists(ObjectPath tablePath) throws CatalogException { Path path = inferTablePath(catalogPathStr, tablePath); @@ -346,7 +356,9 @@ public class TestFileSystemCatalog extends AbstractCatalog { if (!fs.exists(path)) { fs.mkdirs(path); fs.mkdirs(tableSchemaPath); - fs.mkdirs(tableDataPath); + if (isFileSystemTable(catalogTable.getOptions())) { + fs.mkdirs(tableDataPath); + } } // write table schema @@ -649,16 +661,20 @@ public class TestFileSystemCatalog extends AbstractCatalog { String tableDataPath) { if (CatalogBaseTable.TableKind.TABLE == tableKind) { CatalogTable catalogTable = CatalogPropertiesUtil.deserializeCatalogTable(properties); - // put table data path Map<String, String> options = new HashMap<>(catalogTable.getOptions()); - options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath); + if (isFileSystemTable(catalogTable.getOptions())) { + // put table data path + options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath); + } return catalogTable.copy(options); } else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind) { CatalogMaterializedTable catalogMaterializedTable = CatalogPropertiesUtil.deserializeCatalogMaterializedTable(properties); - // put table data path Map<String, String> options = new HashMap<>(catalogMaterializedTable.getOptions()); - options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath); + if (isFileSystemTable(catalogMaterializedTable.getOptions())) { + // put table data path + options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath); + } return catalogMaterializedTable.copy(options); } diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java index e418f6c80a1..4058853ae04 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java @@ -20,11 +20,13 @@ package org.apache.flink.table.file.testutils.catalog; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -44,7 +46,6 @@ public class TestFileSystemCatalogITCase extends TestFileSystemCatalogTestBase { isStreamingMode ? EnvironmentSettings.inStreamingMode() : EnvironmentSettings.inBatchMode()); - tEnv.registerCatalog(TEST_CATALOG, catalog); tEnv.useCatalog(TEST_CATALOG); @@ -84,6 +85,80 @@ public class TestFileSystemCatalogITCase extends TestFileSystemCatalogTestBase { Row.of(1003L, "user3", "ciao", "2021-06-10 10:02:00"), Row.of(1004L, "user4", "你好", "2021-06-10 10:03:00")); - assertThat(expected).isEqualTo((Lists.newArrayList(result))); + assertThat(Lists.newArrayList(result)).isEqualTo(expected); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReadDatagenTable(boolean isStreamingMode) { + TableEnvironment tEnv = + TableEnvironment.create( + isStreamingMode + ? EnvironmentSettings.inStreamingMode() + : EnvironmentSettings.inBatchMode()); + tEnv.registerCatalog(TEST_CATALOG, catalog); + tEnv.useCatalog(TEST_CATALOG); + + tEnv.executeSql( + "CREATE TABLE datagenSource (\n" + + " id BIGINT,\n" + + " user_name STRING,\n" + + " message STRING,\n" + + " log_ts STRING\n" + + ") WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'number-of-rows' = '10'\n" + + ")"); + + tEnv.getConfig().getConfiguration().setString("parallelism.default", "1"); + CloseableIterator<Row> result = + tEnv.executeSql( + String.format( + "SELECT * FROM %s.%s.datagenSource", + TEST_CATALOG, TEST_DEFAULT_DATABASE)) + .collect(); + + // assert query result size + assertThat(Lists.newArrayList(result).size()).isEqualTo(10); + } + + @Test + public void testWriteTestValuesSinkTable() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build()); + tEnv.registerCatalog(TEST_CATALOG, catalog); + tEnv.useCatalog(TEST_CATALOG); + + tEnv.executeSql( + "CREATE TABLE valueSink (\n" + + " id BIGINT,\n" + + " user_name STRING,\n" + + " message STRING,\n" + + " log_ts STRING\n" + + ") WITH (\n" + + " 'connector' = 'values'\n" + + ")"); + + tEnv.getConfig().getConfiguration().setString("parallelism.default", "1"); + tEnv.getConfig().getConfiguration().setString("parallelism.default", "1"); + tEnv.executeSql( + String.format( + "INSERT INTO %s.%s.valueSink VALUES\n" + + "(1001, 'user1', 'hello world', '2021-06-10 10:00:00'),\n" + + "(1002, 'user2', 'hi', '2021-06-10 10:01:00'),\n" + + "(1003, 'user3', 'ciao', '2021-06-10 10:02:00'),\n" + + "(1004, 'user4', '你好', '2021-06-10 10:03:00')", + TEST_CATALOG, TEST_DEFAULT_DATABASE)) + .await(); + + // assert query result size + List<Row> expected = + Arrays.asList( + Row.of(1001L, "user1", "hello world", "2021-06-10 10:00:00"), + Row.of(1002L, "user2", "hi", "2021-06-10 10:01:00"), + Row.of(1003L, "user3", "ciao", "2021-06-10 10:02:00"), + Row.of(1004L, "user4", "你好", "2021-06-10 10:03:00")); + + List<Row> actual = TestValuesTableFactory.getRawResults("valueSink"); + assertThat(actual).isEqualTo(expected); } } diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java index 07b11583d4f..7d4d9ae1afb 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java @@ -256,6 +256,44 @@ public class TestFileSystemCatalogTest extends TestFileSystemCatalogTestBase { () -> catalog.createTable(tablePath, EXPECTED_CATALOG_MATERIALIZED_TABLE, false)); } + @Test + public void testCreateAndGetGenericTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + // test create datagen table + Map<String, String> options = new HashMap<>(); + options.put("connector", "datagen"); + options.put("number-of-rows", "10"); + ResolvedCatalogTable datagenResolvedTable = + new ResolvedCatalogTable( + CatalogTable.newBuilder() + .schema(CREATE_SCHEMA) + .comment("test generic table") + .options(options) + .build(), + CREATE_RESOLVED_SCHEMA); + + catalog.createTable(tablePath, datagenResolvedTable, true); + + // test table exist + assertThat(catalog.tableExists(tablePath)).isTrue(); + + // test get table + CatalogBaseTable actualTable = catalog.getTable(tablePath); + + // validate table type + assertThat(actualTable.getTableKind()).isEqualTo(CatalogBaseTable.TableKind.TABLE); + // validate schema + assertThat(actualTable.getUnresolvedSchema().resolve(new TestSchemaResolver())) + .isEqualTo(CREATE_RESOLVED_SCHEMA); + // validate options + assertThat(actualTable.getOptions()).isEqualTo(options); + + // test create exist table + assertThrows( + TableAlreadyExistException.class, + () -> catalog.createTable(tablePath, datagenResolvedTable, false)); + } + @Test public void testListTable() throws Exception { ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");