This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b80c6cde [FLINK-29922] Validate that only managed table is supported 
in File system catalog
b80c6cde is described below

commit b80c6cde4f3a4804d1772f20e51ba1a7a960b245
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Tue Nov 15 14:34:17 2022 +0800

    [FLINK-29922] Validate that only managed table is supported in File system 
catalog
---
 .../table/store/file/catalog/FileSystemCatalogFactory.java   |  7 +++++++
 .../flink/table/store/file/catalog/CatalogFactoryTest.java   | 12 ++++++++++++
 2 files changed, 19 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
index 98acb1ae..7ef9ec64 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
@@ -20,6 +20,9 @@ package org.apache.flink.table.store.file.catalog;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.table.TableType;
+
+import static org.apache.flink.table.store.CatalogOptions.TABLE_TYPE;
 
 /** Factory to create {@link FileSystemCatalog}. */
 public class FileSystemCatalogFactory implements CatalogFactory {
@@ -33,6 +36,10 @@ public class FileSystemCatalogFactory implements 
CatalogFactory {
 
     @Override
     public Catalog create(String warehouse, Configuration options) {
+        if (!TableType.MANAGED.equals(options.get(TABLE_TYPE))) {
+            throw new IllegalArgumentException(
+                    "Only managed table is supported in File system catalog.");
+        }
         return new FileSystemCatalog(new Path(warehouse));
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/CatalogFactoryTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/CatalogFactoryTest.java
index 75eaf6bc..ae9d3847 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/CatalogFactoryTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/CatalogFactoryTest.java
@@ -21,12 +21,14 @@ package org.apache.flink.table.store.file.catalog;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.table.TableType;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 
+import static org.apache.flink.table.store.CatalogOptions.TABLE_TYPE;
 import static org.apache.flink.table.store.CatalogOptions.WAREHOUSE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -52,4 +54,14 @@ public class CatalogFactoryTest {
         assertThatThrownBy(() -> CatalogFactory.createCatalog(options))
                 .hasMessageContaining("should be a directory");
     }
+
+    @Test
+    public void testNonManagedTable(@TempDir java.nio.file.Path path) {
+        Path root = new Path(path.toUri().toString());
+        Configuration options = new Configuration();
+        options.set(WAREHOUSE, new Path(root, "warehouse").toString());
+        options.set(TABLE_TYPE, TableType.EXTERNAL);
+        assertThatThrownBy(() -> CatalogFactory.createCatalog(options))
+                .hasMessageContaining("Only managed table is supported in File 
system catalog.");
+    }
 }

Reply via email to