wuchong commented on a change in pull request #14944:
URL: https://github.com/apache/flink/pull/14944#discussion_r577748924



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##########
@@ -30,15 +30,15 @@ import 
org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.Simple
 import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
 import org.apache.flink.table.planner.utils.{TableTestUtil, 
TestTableSourceSinks}
 import org.apache.flink.types.Row
-
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql.SqlExplainLevel
+import org.apache.flink.core.testutils.CommonTestUtils.assertThrows
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{Rule, Test}
 
 import _root_.java.util
-
+import java.util.concurrent.Callable

Review comment:
       Please reorg the imports. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##########
@@ -505,6 +505,81 @@ class TableEnvironmentTest {
       tableResult.collect())
   }
 
+  @Test
+  def testExecuteSqlWithLoadModule: Unit = {
+    val result = tableEnv.executeSql("LOAD MODULE dummy")
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+  }
+
+  @Test
+  def testExecuteSqlWithLoadParameterizedModule(): Unit = {
+    val statement1 =
+      """
+        |LOAD MODULE dummy WITH (
+        |  'dummy-version' = '1'
+        |)
+      """.stripMargin
+    val result = tableEnv.executeSql(statement1)
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+
+    val statement2 =
+      """
+        |LOAD MODULE dummy WITH (
+        |'dummy-version' = '2'
+        |)
+      """.stripMargin
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Could not execute LOAD MODULE: (moduleName: [dummy], properties: 
[{dummy-version=2}])." +
+        " A module with name 'dummy' already exists")
+    tableEnv.executeSql(statement2)
+  }
+
+  @Test
+  def testExecuteSqlWithLoadCaseSensitiveModuleName(): Unit = {
+    val statement1 =
+      """
+        |LOAD MODULE Dummy WITH (
+        |  'dummy-version' = '1'
+        |)
+      """.stripMargin
+
+    val code = new Callable[TableResult] {
+      override def call(): TableResult = tableEnv.executeSql(statement1)
+    }
+    assertThrows(

Review comment:
       I suggest to catch the exception and use `containsCause` in this case. 
Using a inner method looks not very clean. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -864,6 +874,31 @@ private Operation 
convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable
         return new DescribeTableOperation(identifier, 
sqlRichDescribeTable.isExtended());
     }
 
+    /** Convert LOAD MODULE statement. */
+    private Operation convertLoadModule(SqlLoadModule sqlLoadModule) {
+        String moduleName = sqlLoadModule.moduleName();
+        Map<String, String> properties = new HashMap<>();
+        for (SqlNode node : sqlLoadModule.getPropertyList().getList()) {
+            SqlTableOption option = (SqlTableOption) node;
+            if (MODULE_TYPE.equals(option.getKeyString())) {

Review comment:
       Could you also move this check to TableEnv? This logic should appare 
with "type" injection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to