wuchong commented on a change in pull request #14944: URL: https://github.com/apache/flink/pull/14944#discussion_r577748924
########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala ########## @@ -30,15 +30,15 @@ import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.Simple import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks} import org.apache.flink.types.Row - import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.sql.SqlExplainLevel +import org.apache.flink.core.testutils.CommonTestUtils.assertThrows import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail} import org.junit.rules.ExpectedException import org.junit.{Rule, Test} import _root_.java.util - +import java.util.concurrent.Callable Review comment: Please reorg the imports. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala ########## @@ -505,6 +505,81 @@ class TableEnvironmentTest { tableResult.collect()) } + @Test + def testExecuteSqlWithLoadModule: Unit = { + val result = tableEnv.executeSql("LOAD MODULE dummy") + assertEquals(ResultKind.SUCCESS, result.getResultKind) + assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy"))) + } + + @Test + def testExecuteSqlWithLoadParameterizedModule(): Unit = { + val statement1 = + """ + |LOAD MODULE dummy WITH ( + | 'dummy-version' = '1' + |) + """.stripMargin + val result = tableEnv.executeSql(statement1) + assertEquals(ResultKind.SUCCESS, result.getResultKind) + assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy"))) + + val statement2 = + """ + |LOAD MODULE dummy WITH ( + |'dummy-version' = '2' + |) + """.stripMargin + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage( + "Could not execute LOAD MODULE: (moduleName: [dummy], properties: [{dummy-version=2}])." + + " A module with name 'dummy' already exists") + tableEnv.executeSql(statement2) + } + + @Test + def testExecuteSqlWithLoadCaseSensitiveModuleName(): Unit = { + val statement1 = + """ + |LOAD MODULE Dummy WITH ( + | 'dummy-version' = '1' + |) + """.stripMargin + + val code = new Callable[TableResult] { + override def call(): TableResult = tableEnv.executeSql(statement1) + } + assertThrows( Review comment: I suggest to catch the exception and use `containsCause` in this case. Using a inner method looks not very clean. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java ########## @@ -864,6 +874,31 @@ private Operation convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended()); } + /** Convert LOAD MODULE statement. */ + private Operation convertLoadModule(SqlLoadModule sqlLoadModule) { + String moduleName = sqlLoadModule.moduleName(); + Map<String, String> properties = new HashMap<>(); + for (SqlNode node : sqlLoadModule.getPropertyList().getList()) { + SqlTableOption option = (SqlTableOption) node; + if (MODULE_TYPE.equals(option.getKeyString())) { Review comment: Could you also move this check to TableEnv? This logic should appare with "type" injection. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org