This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abf3091e9da92660490e600b7605f3e2d2d53ad4
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Jul 3 16:30:34 2020 +0200

    [FLINK-18419] Make user ClassLoader available in TableEnvironment
---
 .../table/client/gateway/local/ExecutionContext.java     |  9 ++++++---
 .../bridge/java/internal/StreamTableEnvironmentImpl.java | 16 +++++++++++++---
 .../java/internal/StreamTableEnvironmentImplTest.java    |  3 ++-
 .../flink/table/api/internal/TableEnvironmentImpl.java   |  8 ++++++--
 .../apache/flink/table/utils/TableEnvironmentMock.java   | 10 +++++++++-
 .../scala/internal/StreamTableEnvironmentImpl.scala      |  9 ++++++---
 .../scala/internal/StreamTableEnvironmentImplTest.scala  |  3 ++-
 .../apache/flink/table/planner/utils/TableTestBase.scala |  9 ++++++---
 .../table/api/stream/StreamTableEnvironmentTest.scala    |  3 ++-
 .../flink/table/api/stream/sql/AggregateTest.scala       |  3 ++-
 .../org/apache/flink/table/utils/TableTestBase.scala     |  6 ++++--
 11 files changed, 58 insertions(+), 21 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 0a444eb2..60e7736 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -424,7 +424,8 @@ public class ExecutionContext<ClusterID> {
                        Executor executor,
                        CatalogManager catalogManager,
                        ModuleManager moduleManager,
-                       FunctionCatalog functionCatalog) {
+                       FunctionCatalog functionCatalog,
+                       ClassLoader userClassLoader) {
 
                final Map<String, String> plannerProperties = 
settings.toPlannerProperties();
                final Planner planner = 
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
@@ -438,7 +439,8 @@ public class ExecutionContext<ClusterID> {
                        env,
                        planner,
                        executor,
-                       settings.isStreamingMode());
+                       settings.isStreamingMode(),
+                       userClassLoader);
        }
 
        private static Executor lookupExecutor(
@@ -599,7 +601,8 @@ public class ExecutionContext<ClusterID> {
                                        executor,
                                        catalogManager,
                                        moduleManager,
-                                       functionCatalog);
+                                       functionCatalog,
+                                       classLoader);
                } else if (environment.getExecution().isBatchPlanner()) {
                        streamExecEnv = null;
                        execEnv = 
ExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 15877ad..05c2f20 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -89,8 +89,17 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
                        StreamExecutionEnvironment executionEnvironment,
                        Planner planner,
                        Executor executor,
-                       boolean isStreamingMode) {
-               super(catalogManager, moduleManager, tableConfig, executor, 
functionCatalog, planner, isStreamingMode);
+                       boolean isStreamingMode,
+                       ClassLoader userClassLoader) {
+               super(
+                       catalogManager,
+                       moduleManager,
+                       tableConfig,
+                       executor,
+                       functionCatalog,
+                       planner,
+                       isStreamingMode,
+                       userClassLoader);
                this.executionEnvironment = executionEnvironment;
        }
 
@@ -137,7 +146,8 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
                        executionEnvironment,
                        planner,
                        executor,
-                       settings.isStreamingMode()
+                       settings.isStreamingMode(),
+                       classLoader
                );
        }
 
diff --git 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index 954fc45..7fcbf80 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -101,7 +101,8 @@ public class StreamTableEnvironmentImplTest {
                        env,
                        new TestPlanner(elements.getTransformation()),
                        new ExecutorMock(),
-                       true
+                       true,
+                       this.getClass().getClassLoader()
                );
        }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 05c578b..954edb5 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -164,6 +164,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
        protected final Planner planner;
        protected final Parser parser;
        private final boolean isStreamingMode;
+       private final ClassLoader userClassLoader;
        private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
                        "Unsupported SQL query! sqlUpdate() only accepts a 
single SQL statement of type " +
                        "INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE 
CATALOG, USE [CATALOG.]DATABASE, " +
@@ -197,7 +198,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                        Executor executor,
                        FunctionCatalog functionCatalog,
                        Planner planner,
-                       boolean isStreamingMode) {
+                       boolean isStreamingMode,
+                       ClassLoader userClassLoader) {
                this.catalogManager = catalogManager;
                this.catalogManager.setCatalogTableSchemaResolver(
                                new 
CatalogTableSchemaResolver(planner.getParser(), isStreamingMode));
@@ -210,6 +212,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                this.planner = planner;
                this.parser = planner.getParser();
                this.isStreamingMode = isStreamingMode;
+               this.userClassLoader = userClassLoader;
                this.operationTreeBuilder = OperationTreeBuilder.create(
                        tableConfig,
                        functionCatalog.asLookup(parser::parseIdentifier),
@@ -272,7 +275,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                        executor,
                        functionCatalog,
                        planner,
-                       settings.isStreamingMode()
+                       settings.isStreamingMode(),
+                       classLoader
                );
        }
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index dcbec07..5d0f021 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -46,7 +46,15 @@ public class TableEnvironmentMock extends 
TableEnvironmentImpl {
                        FunctionCatalog functionCatalog,
                        PlannerMock planner,
                        boolean isStreamingMode) {
-               super(catalogManager, moduleManager, tableConfig, executor, 
functionCatalog, planner, isStreamingMode);
+               super(
+                       catalogManager,
+                       moduleManager,
+                       tableConfig,
+                       executor,
+                       functionCatalog,
+                       planner,
+                       isStreamingMode,
+                       TableEnvironmentMock.class.getClassLoader());
 
                this.catalogManager = catalogManager;
                this.executor = executor;
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 7c99e37..317cb6a 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -58,7 +58,8 @@ class StreamTableEnvironmentImpl (
     scalaExecutionEnvironment: StreamExecutionEnvironment,
     planner: Planner,
     executor: Executor,
-    isStreaming: Boolean)
+    isStreaming: Boolean,
+    userClassLoader: ClassLoader)
   extends TableEnvironmentImpl(
     catalogManager,
     moduleManager,
@@ -66,7 +67,8 @@ class StreamTableEnvironmentImpl (
     executor,
     functionCatalog,
     planner,
-    isStreaming)
+    isStreaming,
+    userClassLoader)
   with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment {
 
   override def fromDataStream[T](dataStream: DataStream[T]): Table = {
@@ -299,7 +301,8 @@ object StreamTableEnvironmentImpl {
       executionEnvironment,
       planner,
       executor,
-      settings.isStreamingMode
+      settings.isStreamingMode,
+      classLoader
     )
   }
 
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
 
b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
index 0f84d9b..7f67ebb 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -91,7 +91,8 @@ class StreamTableEnvironmentImplTest {
       env,
       new TestPlanner(elements.javaStream.getTransformation),
       new ExecutorMock,
-      true)
+      true,
+      this.getClass.getClassLoader)
   }
 
   private class TestPlanner(transformation: Transformation[_]) extends 
PlannerMock {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 05b54c7..c942911 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -959,7 +959,8 @@ class TestingTableEnvironment private(
     executor: Executor,
     functionCatalog: FunctionCatalog,
     planner: PlannerBase,
-    isStreamingMode: Boolean)
+    isStreamingMode: Boolean,
+    userClassLoader: ClassLoader)
   extends TableEnvironmentImpl(
     catalogManager,
     moduleManager,
@@ -967,7 +968,8 @@ class TestingTableEnvironment private(
     executor,
     functionCatalog,
     planner,
-    isStreamingMode) {
+    isStreamingMode,
+    userClassLoader) {
 
   // just for testing, remove this method while
   // `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> 
aggregateFunction);`
@@ -1118,7 +1120,8 @@ object TestingTableEnvironment {
       executor,
       functionCatalog,
       planner,
-      settings.isStreamingMode)
+      settings.isStreamingMode,
+      classLoader)
   }
 }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 5ef1158..406e6b4 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -425,7 +425,8 @@ class StreamTableEnvironmentTest extends TableTestBase {
       jStreamExecEnv,
       streamPlanner,
       executor,
-      true)
+      true,
+      Thread.currentThread().getContextClassLoader)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, 
Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index f7da2d6..835a26f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -79,7 +79,8 @@ class AggregateTest extends TableTestBase {
       Mockito.mock(classOf[StreamExecutionEnvironment]),
       new PlannerMock,
       Mockito.mock(classOf[Executor]),
-      true
+      true,
+      Thread.currentThread().getContextClassLoader
     )
 
     tablEnv.registerFunction("udag", new MyAgg)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 44e0bb9..562a240 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -347,7 +347,8 @@ case class StreamTableTestUtil(
     javaEnv,
     streamPlanner,
     executor,
-    true)
+    true,
+    Thread.currentThread().getContextClassLoader)
 
   val env = new StreamExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaStreamTableEnvironmentImpl(
@@ -358,7 +359,8 @@ case class StreamTableTestUtil(
     env,
     streamPlanner,
     executor,
-    true)
+    true,
+    Thread.currentThread().getContextClassLoader)
 
   def addTable[T: TypeInformation](
       name: String,

Reply via email to