This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1aa92e897e7d71a1063b87ee08496edac6e4b633 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Jul 3 16:31:23 2020 +0200 [FLINK-18419] Create catalog in TableEnvironment using user ClassLoader --- .../table/api/internal/TableEnvironmentImpl.java | 30 ++++++++---- .../operations/ddl/CreateCatalogOperation.java | 12 ++--- flink-table/flink-table-planner-blink/pom.xml | 8 ++++ .../operations/SqlToOperationConverter.java | 8 +--- .../flink/table/planner/catalog/CatalogITCase.java | 54 ++++++++++++++++++++++ 5 files changed, 90 insertions(+), 22 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 99306a0..d4f43db 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -72,7 +72,9 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.table.descriptors.StreamTableDescriptor; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.ComponentFactoryService; +import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.functions.UserDefinedFunctionHelper; @@ -1014,15 +1016,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } else if (operation instanceof AlterCatalogFunctionOperation) { return alterCatalogFunction((AlterCatalogFunctionOperation) operation); } else if (operation instanceof CreateCatalogOperation) { - CreateCatalogOperation createCatalogOperation = (CreateCatalogOperation) operation; - String exMsg = getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString()); - try { - catalogManager.registerCatalog( - createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog()); - return TableResultImpl.TABLE_RESULT_OK; - } catch (CatalogException e) { - throw new ValidationException(exMsg, e); - } + return createCatalog((CreateCatalogOperation) operation); } else if (operation instanceof DropCatalogOperation) { DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation; String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString()); @@ -1078,6 +1072,24 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } } + private TableResult createCatalog(CreateCatalogOperation operation) { + String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString()); + try { + String catalogName = operation.getCatalogName(); + Map<String, String> properties = operation.getProperties(); + final CatalogFactory factory = TableFactoryService.find( + CatalogFactory.class, + properties, + userClassLoader); + + Catalog catalog = factory.createCatalog(catalogName, properties); + catalogManager.registerCatalog(catalogName, catalog); + return TableResultImpl.TABLE_RESULT_OK; + } catch (CatalogException e) { + throw new ValidationException(exMsg, e); + } + } + private TableResult buildShowResult(String columnName, String[] objects) { return buildResult( new String[]{columnName}, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java index 1d04a20..c81ee1d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java @@ -18,7 +18,6 @@ package org.apache.flink.table.operations.ddl; -import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.OperationUtils; @@ -33,25 +32,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class CreateCatalogOperation implements CreateOperation { private final String catalogName; - private final Catalog catalog; + private final Map<String, String> properties; - public CreateCatalogOperation(String catalogName, Catalog catalog) { + public CreateCatalogOperation(String catalogName, Map<String, String> properties) { this.catalogName = checkNotNull(catalogName); - this.catalog = checkNotNull(catalog); + this.properties = checkNotNull(properties); } public String getCatalogName() { return catalogName; } - public Catalog getCatalog() { - return catalog; + public Map<String, String> getProperties() { + return Collections.unmodifiableMap(properties); } @Override public String asSummaryString() { Map<String, Object> params = new LinkedHashMap<>(); params.put("catalogName", catalogName); + params.put("properties", properties); return OperationUtils.formatWithChildren( "CREATE CATALOG", diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml index a7cfb3a..085eba5 100644 --- a/flink-table/flink-table-planner-blink/pom.xml +++ b/flink-table/flink-table-planner-blink/pom.xml @@ -223,6 +223,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- SuccessException used in TestValuesTableFactory --> <dependency> <groupId>org.apache.flink</groupId> diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 386cb9a..be54497 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -74,8 +74,6 @@ import org.apache.flink.table.catalog.FunctionLanguage; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.factories.CatalogFactory; -import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.ExplainOperation; @@ -553,11 +551,7 @@ public class SqlToOperationConverter { sqlCreateCatalog.getPropertyList().getList().forEach(p -> properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); - final CatalogFactory factory = - TableFactoryService.find(CatalogFactory.class, properties, this.getClass().getClassLoader()); - - Catalog catalog = factory.createCatalog(catalogName, properties); - return new CreateCatalogOperation(catalogName, catalog); + return new CreateCatalogOperation(catalogName, properties); } /** Convert DROP CATALOG statement. */ diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java index 4e8e95b..8a46c1e 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java @@ -23,8 +23,14 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.testutils.ClassLoaderUtils; +import org.apache.flink.util.TemporaryClassLoaderContext; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.net.URLClassLoader; import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY; import static org.junit.Assert.assertFalse; @@ -35,6 +41,9 @@ import static org.junit.Assert.assertTrue; */ public class CatalogITCase { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test public void testCreateCatalog() { String name = "c1"; @@ -61,9 +70,54 @@ public class CatalogITCase { assertFalse(tableEnv.getCatalog(name).isPresent()); } + @Test + public void testCreateCatalogFromUserClassLoader() throws Exception { + final String className = "UserCatalogFactory"; + URLClassLoader classLoader = ClassLoaderUtils.withRoot(temporaryFolder.newFolder()) + .addResource("META-INF/services/org.apache.flink.table.factories.TableFactory", "UserCatalogFactory") + .addClass( + className, + "import org.apache.flink.table.catalog.GenericInMemoryCatalog;\n" + + "import org.apache.flink.table.factories.CatalogFactory;\n" + + "import java.util.Collections;\n" + + "import org.apache.flink.table.catalog.Catalog;\n" + + "import java.util.HashMap;\n" + + "import java.util.List;\n" + + "import java.util.Map;\n" + + "\tpublic class UserCatalogFactory implements CatalogFactory {\n" + + "\t\t@Override\n" + + "\t\tpublic Catalog createCatalog(\n" + + "\t\t\t\tString name,\n" + + "\t\t\t\tMap<String, String> properties) {\n" + + "\t\t\treturn new GenericInMemoryCatalog(name);\n" + + "\t\t}\n" + + "\n" + + "\t\t@Override\n" + + "\t\tpublic Map<String, String> requiredContext() {\n" + + "\t\t\tHashMap<String, String> hashMap = new HashMap<>();\n" + + "\t\t\thashMap.put(\"type\", \"userCatalog\");\n" + + "\t\t\treturn hashMap;\n" + + "\t\t}\n" + + "\n" + + "\t\t@Override\n" + + "\t\tpublic List<String> supportedProperties() {\n" + + "\t\t\treturn Collections.emptyList();\n" + + "\t\t}\n" + + "\t}" + ).build(); + + try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) { + TableEnvironment tableEnvironment = getTableEnvironment(); + tableEnvironment.executeSql("CREATE CATALOG cat WITH ('type'='userCatalog')"); + + assertTrue(tableEnvironment.getCatalog("cat").isPresent()); + } + } + private TableEnvironment getTableEnvironment() { EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); return StreamTableEnvironment.create(env, settings); } + }