wuchong commented on a change in pull request #14944:
URL: https://github.com/apache/flink/pull/14944#discussion_r576913197



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -864,6 +874,32 @@ private Operation 
convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable
         return new DescribeTableOperation(identifier, 
sqlRichDescribeTable.isExtended());
     }
 
+    /** Convert LOAD MODULE statement. */
+    private Operation convertLoadModule(SqlLoadModule sqlLoadModule) {
+        String moduleName = sqlLoadModule.moduleName();
+        Map<String, String> properties = new HashMap<>();
+        for (SqlNode node : sqlLoadModule.getPropertyList().getList()) {
+            SqlTableOption option = (SqlTableOption) node;
+            if (MODULE_TYPE.equals(option.getKeyString())) {
+                String moduleType = option.getValueString();
+                throw new ValidationException(
+                        String.format(
+                                "Property 'type' = '%s' is not supported, 
please remove it and "
+                                        + "rename module to '%s' and try 
again",
+                                moduleType, moduleType));

Review comment:
       The module name might already be `moduleType`. Maybe we can just mention 
type property is not supported and the module name is used to find module. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -864,6 +874,32 @@ private Operation 
convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable
         return new DescribeTableOperation(identifier, 
sqlRichDescribeTable.isExtended());
     }
 
+    /** Convert LOAD MODULE statement. */
+    private Operation convertLoadModule(SqlLoadModule sqlLoadModule) {
+        String moduleName = sqlLoadModule.moduleName();
+        Map<String, String> properties = new HashMap<>();
+        for (SqlNode node : sqlLoadModule.getPropertyList().getList()) {
+            SqlTableOption option = (SqlTableOption) node;
+            if (MODULE_TYPE.equals(option.getKeyString())) {
+                String moduleType = option.getValueString();
+                throw new ValidationException(
+                        String.format(
+                                "Property 'type' = '%s' is not supported, 
please remove it and "
+                                        + "rename module to '%s' and try 
again",
+                                moduleType, moduleType));
+            }
+            properties.put(option.getKeyString(), option.getValueString());
+        }
+        properties.put(MODULE_TYPE, moduleName);
+        return new LoadModuleOperation(moduleName, properties);
+    }
+
+    /** Convert UNLOAD MODULE statement. */
+    private Operation convertUnloadModule(SqlUnloadModule sqlUnloadModule) {
+        String moduleName = sqlUnloadModule.moduleName().toLowerCase();

Review comment:
       Why toLowerCase here?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -864,6 +874,32 @@ private Operation 
convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable
         return new DescribeTableOperation(identifier, 
sqlRichDescribeTable.isExtended());
     }
 
+    /** Convert LOAD MODULE statement. */
+    private Operation convertLoadModule(SqlLoadModule sqlLoadModule) {
+        String moduleName = sqlLoadModule.moduleName();
+        Map<String, String> properties = new HashMap<>();
+        for (SqlNode node : sqlLoadModule.getPropertyList().getList()) {
+            SqlTableOption option = (SqlTableOption) node;
+            if (MODULE_TYPE.equals(option.getKeyString())) {
+                String moduleType = option.getValueString();
+                throw new ValidationException(
+                        String.format(
+                                "Property 'type' = '%s' is not supported, 
please remove it and "
+                                        + "rename module to '%s' and try 
again",
+                                moduleType, moduleType));
+            }
+            properties.put(option.getKeyString(), option.getValueString());
+        }
+        properties.put(MODULE_TYPE, moduleName);

Review comment:
       I'm a bit of not sure about this. I would prefer to delay the "type" 
injection when discovering module factory (i.e. in the TableEnv). Because in 
the future, we may migrate to the new Factory which don't need the "type" 
property. You can have a look at 
`org.apache.flink.table.factories.FactoryUtil#discoverFactory`, an additional 
"type" property will result into discoverying failed. From my point of view, an 
operation should purely describing the SQL syntax without adding any processing 
logic. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##########
@@ -505,6 +503,79 @@ class TableEnvironmentTest {
       tableResult.collect())
   }
 
+  @Test
+  def testExecuteSqlWithLoadModule: Unit = {
+    val result = tableEnv.executeSql("LOAD MODULE dummy")
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+  }
+
+  @Test
+  def testExecuteSqlWithLoadParameterizedModule(): Unit = {
+    val statement1 =
+      """
+        |LOAD MODULE dummy WITH (
+        |  'dummy-version' = '1'
+        |)
+      """.stripMargin
+    val result = tableEnv.executeSql(statement1)
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+
+    val statement2 =
+      """
+        |LOAD MODULE dummy WITH (
+        |'dummy-version' = '2'
+        |)
+      """.stripMargin
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Could not execute LOAD MODULE: (moduleName: [dummy], properties: 
[{dummy-version=2, " +
+        "type=dummy}]). A module with name 'dummy' already exists")
+    tableEnv.executeSql(statement2)
+  }
+
+  @Test
+  def testExecuteSqlWithLoadCaseSensitiveModuleName(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage(
+      "Could not execute LOAD MODULE: (moduleName: [Dummy], properties: 
[{dummy-version=1, " +
+        "type=Dummy}]). Could not find a suitable table factory for 
'org.apache.flink.table" +
+        ".factories.ModuleFactory' in\nthe classpath.")
+    val statement1 =
+      """
+        |LOAD MODULE Dummy WITH (
+        |  'dummy-version' = '1'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(statement1)
+
+    val statement2 =
+      """
+        |LOAD MODULE dummy WITH (
+        |'dummy-version' = '2'
+        |)
+      """.stripMargin
+    val result = tableEnv.executeSql(statement2)
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))

Review comment:
       These code block is never reached because `expectedException` will 
return the method when any exception happens. An alternative way is using 
`org.apache.flink.core.testutils.CommonTestUtils#assertThrows` or 
`containsCause`.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
##########
@@ -205,6 +211,10 @@ private SqlToOperationConverter(FlinkPlannerImpl 
flinkPlanner, CatalogManager ca
                 throw new ValidationException("Partial inserts are not 
supported");
             }
             return Optional.of(converter.convertSqlInsert((RichSqlInsert) 
validated));
+        } else if (validated instanceof SqlLoadModule) {

Review comment:
       Could you please revert the implementation of old planner? We don't need 
to support new features for old planner, otherwise, it increase lots of 
maintain burdens. 

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
##########
@@ -1618,6 +1620,46 @@ public void testCreateFunctionWithHiveCatalog() throws 
Exception {
         executor.closeSession(sessionId);
     }
 
+    @Test
+    public void testLoadModule() throws Exception {
+        final Executor executor =
+                createModifiedExecutor(
+                        MODULES_ENVIRONMENT_FILE, clusterClient, 
createModuleReplaceVars());
+        final SessionContext session = new SessionContext("test-session", new 
Environment());
+        String sessionId = executor.openSession(session);
+        assertEquals("test-session", sessionId);
+
+        exception.expect(SqlExecutionException.class);
+        exception.expectMessage("Could not execute statement: load module 
core");

Review comment:
       ditto.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
##########
@@ -1618,6 +1620,46 @@ public void testCreateFunctionWithHiveCatalog() throws 
Exception {
         executor.closeSession(sessionId);
     }
 
+    @Test
+    public void testLoadModule() throws Exception {
+        final Executor executor =
+                createModifiedExecutor(
+                        MODULES_ENVIRONMENT_FILE, clusterClient, 
createModuleReplaceVars());
+        final SessionContext session = new SessionContext("test-session", new 
Environment());
+        String sessionId = executor.openSession(session);
+        assertEquals("test-session", sessionId);
+
+        exception.expect(SqlExecutionException.class);
+        exception.expectMessage("Could not execute statement: load module 
core");
+        executor.executeSql(sessionId, "load module core");
+
+        executor.executeSql(sessionId, "load module hive");
+        assertShowResult(
+                executor.executeSql(sessionId, "show modules"), 
Arrays.asList("core", "hive"));
+    }
+
+    @Test
+    public void testUnloadModule() throws Exception {
+        final Executor executor =
+                createModifiedExecutor(
+                        MODULES_ENVIRONMENT_FILE, clusterClient, 
createModuleReplaceVars());
+        final SessionContext session = new SessionContext("test-session", new 
Environment());
+        String sessionId = executor.openSession(session);
+        assertEquals("test-session", sessionId);
+
+        exception.expect(SqlExecutionException.class);
+        exception.expectMessage("Could not execute statement: unload module 
hive");

Review comment:
       ditto.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
##########
@@ -282,19 +282,27 @@ public void testCommands() throws Exception {
                         // show current database
                         TestItem.validSql(
                                 "show current database", 
SqlCommand.SHOW_CURRENT_DATABASE),
+                        // load module
                         TestItem.validSql(
-                                "show  current         database", 
SqlCommand.SHOW_CURRENT_DATABASE),
-                        // show tables
-                        TestItem.validSql("SHOW TABLES;", 
SqlCommand.SHOW_TABLES),
-                        TestItem.validSql("  SHOW   TABLES   ;", 
SqlCommand.SHOW_TABLES),
-                        // show functions
-                        TestItem.validSql("SHOW FUNCTIONS;", 
SqlCommand.SHOW_FUNCTIONS),
-                        TestItem.validSql("  SHOW    FUNCTIONS   ", 
SqlCommand.SHOW_FUNCTIONS),
-                        // show modules
-                        TestItem.validSql("SHOW MODULES", 
SqlCommand.SHOW_MODULES)
-                                .cannotParseComment(),
-                        TestItem.validSql("  SHOW    MODULES   ", 
SqlCommand.SHOW_MODULES)
-                                .cannotParseComment(),
+                                "LOAD MODULE dummy", SqlCommand.LOAD_MODULE, 
"LOAD MODULE dummy"),
+                        TestItem.validSql(
+                                "LOAD MODULE dummy WITH ('dummy-version' = 
'1')",
+                                SqlCommand.LOAD_MODULE,
+                                "LOAD MODULE dummy WITH ('dummy-version' = 
'1')"),
+                        TestItem.invalidSql(
+                                "LOAD MODULE 'dummy'",
+                                SqlExecutionException.class,
+                                "Encountered \"\\'dummy\\'\""),
+                        TestItem.invalidSql(
+                                "LOAD MODULE my_dummy WITH ('type'='dummy')",
+                                SqlExecutionException.class,
+                                "Property 'type' = 'dummy' is not supported, 
please remove it "
+                                        + "and rename module to 'dummy' and 
try again"),
+                        // unload module
+                        TestItem.validSql(
+                                "UNLOAD MODULE dummy",

Review comment:
       Could you add tests using reserved keyword as module name with backticks.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
##########
@@ -1618,6 +1620,46 @@ public void testCreateFunctionWithHiveCatalog() throws 
Exception {
         executor.closeSession(sessionId);
     }
 
+    @Test
+    public void testLoadModule() throws Exception {
+        final Executor executor =
+                createModifiedExecutor(
+                        MODULES_ENVIRONMENT_FILE, clusterClient, 
createModuleReplaceVars());
+        final SessionContext session = new SessionContext("test-session", new 
Environment());
+        String sessionId = executor.openSession(session);
+        assertEquals("test-session", sessionId);
+
+        exception.expect(SqlExecutionException.class);
+        exception.expectMessage("Could not execute statement: load module 
core");
+        executor.executeSql(sessionId, "load module core");
+
+        executor.executeSql(sessionId, "load module hive");
+        assertShowResult(
+                executor.executeSql(sessionId, "show modules"), 
Arrays.asList("core", "hive"));

Review comment:
       Would be better to execute a query which uses Hive functions after this. 
This can verify the Hive module is exactly loaded, and this is an integration 
test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to