This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1aa92e897e7d71a1063b87ee08496edac6e4b633
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Jul 3 16:31:23 2020 +0200

    [FLINK-18419] Create catalog in TableEnvironment using user ClassLoader
---
 .../table/api/internal/TableEnvironmentImpl.java   | 30 ++++++++----
 .../operations/ddl/CreateCatalogOperation.java     | 12 ++---
 flink-table/flink-table-planner-blink/pom.xml      |  8 ++++
 .../operations/SqlToOperationConverter.java        |  8 +---
 .../flink/table/planner/catalog/CatalogITCase.java | 54 ++++++++++++++++++++++
 5 files changed, 90 insertions(+), 22 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 99306a0..d4f43db 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -72,7 +72,9 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
@@ -1014,15 +1016,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                } else if (operation instanceof AlterCatalogFunctionOperation) {
                        return 
alterCatalogFunction((AlterCatalogFunctionOperation) operation);
                } else if (operation instanceof CreateCatalogOperation) {
-                       CreateCatalogOperation createCatalogOperation = 
(CreateCatalogOperation) operation;
-                       String exMsg = 
getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString());
-                       try {
-                               catalogManager.registerCatalog(
-                                               
createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog());
-                               return TableResultImpl.TABLE_RESULT_OK;
-                       } catch (CatalogException e) {
-                               throw new ValidationException(exMsg, e);
-                       }
+                       return createCatalog((CreateCatalogOperation) 
operation);
                } else if (operation instanceof DropCatalogOperation) {
                        DropCatalogOperation dropCatalogOperation = 
(DropCatalogOperation) operation;
                        String exMsg = 
getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());
@@ -1078,6 +1072,24 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                }
        }
 
+       private TableResult createCatalog(CreateCatalogOperation operation) {
+               String exMsg = 
getDDLOpExecuteErrorMsg(operation.asSummaryString());
+               try {
+                       String catalogName = operation.getCatalogName();
+                       Map<String, String> properties = 
operation.getProperties();
+                       final CatalogFactory factory = TableFactoryService.find(
+                               CatalogFactory.class,
+                               properties,
+                               userClassLoader);
+
+                       Catalog catalog = factory.createCatalog(catalogName, 
properties);
+                       catalogManager.registerCatalog(catalogName, catalog);
+                       return TableResultImpl.TABLE_RESULT_OK;
+               } catch (CatalogException e) {
+                       throw new ValidationException(exMsg, e);
+               }
+       }
+
        private TableResult buildShowResult(String columnName, String[] 
objects) {
                return buildResult(
                        new String[]{columnName},
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
index 1d04a20..c81ee1d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.operations.ddl;
 
-import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
 
@@ -33,25 +32,26 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CreateCatalogOperation implements CreateOperation {
        private final String catalogName;
-       private final Catalog catalog;
+       private final Map<String, String> properties;
 
-       public CreateCatalogOperation(String catalogName, Catalog catalog) {
+       public CreateCatalogOperation(String catalogName, Map<String, String> 
properties) {
                this.catalogName = checkNotNull(catalogName);
-               this.catalog = checkNotNull(catalog);
+               this.properties = checkNotNull(properties);
        }
 
        public String getCatalogName() {
                return catalogName;
        }
 
-       public Catalog getCatalog() {
-               return catalog;
+       public Map<String, String> getProperties() {
+               return Collections.unmodifiableMap(properties);
        }
 
        @Override
        public String asSummaryString() {
                Map<String, Object> params = new LinkedHashMap<>();
                params.put("catalogName", catalogName);
+               params.put("properties", properties);
 
                return OperationUtils.formatWithChildren(
                        "CREATE CATALOG",
diff --git a/flink-table/flink-table-planner-blink/pom.xml 
b/flink-table/flink-table-planner-blink/pom.xml
index a7cfb3a..085eba5 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -223,6 +223,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- SuccessException used in TestValuesTableFactory -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 386cb9a..be54497 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -74,8 +74,6 @@ import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
@@ -553,11 +551,7 @@ public class SqlToOperationConverter {
                sqlCreateCatalog.getPropertyList().getList().forEach(p ->
                        properties.put(((SqlTableOption) p).getKeyString(), 
((SqlTableOption) p).getValueString()));
 
-               final CatalogFactory factory =
-                       TableFactoryService.find(CatalogFactory.class, 
properties, this.getClass().getClassLoader());
-
-               Catalog catalog = factory.createCatalog(catalogName, 
properties);
-               return new CreateCatalogOperation(catalogName, catalog);
+               return new CreateCatalogOperation(catalogName, properties);
        }
 
        /** Convert DROP CATALOG statement. */
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
index 4e8e95b..8a46c1e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -23,8 +23,14 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URLClassLoader;
 
 import static 
org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
 import static org.junit.Assert.assertFalse;
@@ -35,6 +41,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class CatalogITCase {
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        @Test
        public void testCreateCatalog() {
                String name = "c1";
@@ -61,9 +70,54 @@ public class CatalogITCase {
                assertFalse(tableEnv.getCatalog(name).isPresent());
        }
 
+       @Test
+       public void testCreateCatalogFromUserClassLoader() throws Exception {
+               final String className = "UserCatalogFactory";
+               URLClassLoader classLoader = 
ClassLoaderUtils.withRoot(temporaryFolder.newFolder())
+                       
.addResource("META-INF/services/org.apache.flink.table.factories.TableFactory", 
"UserCatalogFactory")
+                       .addClass(
+                               className,
+                               "import 
org.apache.flink.table.catalog.GenericInMemoryCatalog;\n" +
+                                       "import 
org.apache.flink.table.factories.CatalogFactory;\n" +
+                                       "import java.util.Collections;\n" +
+                                       "import 
org.apache.flink.table.catalog.Catalog;\n" +
+                                       "import java.util.HashMap;\n" +
+                                       "import java.util.List;\n" +
+                                       "import java.util.Map;\n" +
+                                       "\tpublic class UserCatalogFactory 
implements CatalogFactory {\n" +
+                                       "\t\t@Override\n" +
+                                       "\t\tpublic Catalog createCatalog(\n" +
+                                       "\t\t\t\tString name,\n" +
+                                       "\t\t\t\tMap<String, String> 
properties) {\n" +
+                                       "\t\t\treturn new 
GenericInMemoryCatalog(name);\n" +
+                                       "\t\t}\n" +
+                                       "\n" +
+                                       "\t\t@Override\n" +
+                                       "\t\tpublic Map<String, String> 
requiredContext() {\n" +
+                                       "\t\t\tHashMap<String, String> hashMap 
= new HashMap<>();\n" +
+                                       "\t\t\thashMap.put(\"type\", 
\"userCatalog\");\n" +
+                                       "\t\t\treturn hashMap;\n" +
+                                       "\t\t}\n" +
+                                       "\n" +
+                                       "\t\t@Override\n" +
+                                       "\t\tpublic List<String> 
supportedProperties() {\n" +
+                                       "\t\t\treturn 
Collections.emptyList();\n" +
+                                       "\t\t}\n" +
+                                       "\t}"
+                       ).build();
+
+               try (TemporaryClassLoaderContext context = 
TemporaryClassLoaderContext.of(classLoader)) {
+                       TableEnvironment tableEnvironment = 
getTableEnvironment();
+                       tableEnvironment.executeSql("CREATE CATALOG cat WITH 
('type'='userCatalog')");
+
+                       
assertTrue(tableEnvironment.getCatalog("cat").isPresent());
+               }
+       }
+
        private TableEnvironment getTableEnvironment() {
                EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                return StreamTableEnvironment.create(env, settings);
        }
+
 }

Reply via email to