wuchong commented on a change in pull request #14895:
URL: https://github.com/apache/flink/pull/14895#discussion_r573404988



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
##########
@@ -718,12 +727,20 @@ void createFunction(
     String[] listCatalogs();
 
     /**
-     * Gets an array of names of all modules in this environment in the loaded 
order.
+     * Gets an array of names of all used modules in this environment in the 
most-recent use order.

Review comment:
       Not sure about "most-recent use order", it sounds like LRU that the 
module been used have higher priority. 
   
   What about "resolution order"? 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/** A POJO to represent a module's name and use status. */
+public class ModuleEntry {

Review comment:
       Add `@PublicEvolving` annotation.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
##########
@@ -45,99 +49,149 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ModuleManager.class);
 
-    private LinkedHashMap<String, Module> modules;
+    private Map<String, Module> loadedModules;
+    private List<String> usedModules;
 
     public ModuleManager() {
-        this.modules = new LinkedHashMap<>();
-
-        modules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        this.loadedModules = new HashMap<>();
+        this.usedModules = new ArrayList<>();
+        loadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        usedModules.add(MODULE_TYPE_CORE);
     }
 
     /**
      * Load a module under a unique name. Modules will be kept in the loaded 
order, and new module
-     * will be added to the end. ValidationException is thrown when there is 
already a module with
-     * the same name.
+     * will be added to the left before the unused module and turn on use by 
default.
      *
      * @param name name of the module
      * @param module the module instance
+     * @throws ValidationException when there already exists a module with the 
same name
      */
     public void loadModule(String name, Module module) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(name), "name cannot be 
null or empty string");
         checkNotNull(module, "module cannot be null");
 
-        if (!modules.containsKey(name)) {
-            modules.put(name, module);
-
-            LOG.info("Loaded module {} from class {}", name, 
module.getClass().getName());
-        } else {
+        if (loadedModules.containsKey(name)) {
             throw new ValidationException(
                     String.format("A module with name %s already exists", 
name));
+        } else {
+            usedModules.add(name);
+            loadedModules.put(name, module);
+            LOG.info("Loaded module {} from class {}", name, 
module.getClass().getName());
         }
     }
 
     /**
-     * Unload a module with given name. ValidationException is thrown when 
there is no module with
-     * the given name.
+     * Unload a module with given name.
      *
      * @param name name of the module
+     * @throws ValidationException when there is no module with the given name
      */
     public void unloadModule(String name) {
-        if (modules.containsKey(name)) {
-            modules.remove(name);
-
-            LOG.info("Unloaded module {}", name);
+        if (loadedModules.containsKey(name)) {
+            loadedModules.remove(name);
+            boolean used = usedModules.remove(name);
+            LOG.info("Unloaded an {} module {}", used ? "used" : "unused", 
name);

Review comment:
       Could you add single quotes around the module name to improve the 
readability? 
   The same to the others including the exception messages. 

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** Mocking {@link Module} for tests. */
+public abstract class ModuleMock implements Module {
+    private static final Set<String> BUILT_IN_FUNCTIONS =
+            Collections.unmodifiableSet(new 
HashSet<>(Collections.singletonList("dummy")));
+    private final String type;
+    private final FunctionDefinitionMock functionDef;
+
+    public ModuleMock(String type) {
+        this.type = type;
+        functionDef = new FunctionDefinitionMock();
+        functionDef.functionKind = FunctionKind.OTHER;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public Set<String> listFunctions() {
+        return BUILT_IN_FUNCTIONS;
+    }
+
+    @Override
+    public Optional<FunctionDefinition> getFunctionDefinition(String name) {
+        if (BUILT_IN_FUNCTIONS.contains(name)) {
+            return Optional.of(functionDef);
+        }
+        return Optional.empty();
+    }
+
+    /** A simple mock module with type "x". */
+    public static class ModuleX extends ModuleMock {

Review comment:
       Nit: I think we don't need generic types `ModuleX`, `ModuleY`, 
`ModuleZ`, because the `type` is still manually specified. If we have these 
generic types, the type should be pre-defined. 

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.ModuleMock;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.descriptors.CoreModuleDescriptorValidator.MODULE_TYPE_CORE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ModuleManager}. */
+public class ModuleManagerTest extends TestLogger {
+    private static final Comparator<ModuleEntry> COMPARATOR =
+            Comparator.comparing(ModuleEntry::name);
+    private ModuleManager manager;
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Before
+    public void before() {
+        manager = new ModuleManager();
+    }
+
+    @Test
+    public void testLoadModuleTwice() {
+        // CoreModule is loaded by default
+        assertEquals(Collections.singletonList(MODULE_TYPE_CORE), 
manager.getUsedModules());
+        assertEquals(CoreModule.INSTANCE, 
manager.getLoadedModules().get(MODULE_TYPE_CORE));
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("A module with name core already exists");
+        manager.loadModule(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+    }
+
+    @Test
+    public void testLoadModuleWithoutUnusedModulesExist() {
+        ModuleMock.ModuleX x = new ModuleMock.ModuleX("x");
+        ModuleMock.ModuleY y = new ModuleMock.ModuleY("y");
+        ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z");
+        manager.loadModule(x.getType(), x);
+        manager.loadModule(y.getType(), y);
+        manager.loadModule(z.getType(), z);
+
+        Map<String, Module> expectedLoadedModules = new HashMap<>();
+        expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        expectedLoadedModules.put("x", x);
+        expectedLoadedModules.put("y", y);
+        expectedLoadedModules.put("z", z);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), 
manager.getUsedModules());
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+    }
+
+    @Test
+    public void testLoadModuleWithUnusedModulesExist() {
+        ModuleMock.ModuleY y = new ModuleMock.ModuleY("y");
+        ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z");
+        manager.loadModule(y.getType(), y);
+        manager.loadModule(z.getType(), z);
+
+        Map<String, Module> expectedLoadedModules = new HashMap<>();
+        expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        expectedLoadedModules.put("y", y);
+        expectedLoadedModules.put("z", z);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "y", "z"), 
manager.getUsedModules());
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+
+        // reset usedModules to mock module y and z are disabled
+        manager.getUsedModules().remove("z");
+        manager.getUsedModules().remove("y");
+
+        // load module x to test the order
+        ModuleMock.ModuleX x = new ModuleMock.ModuleX("x");
+        manager.loadModule(x.getType(), x);
+        expectedLoadedModules.put("x", x);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x"), 
manager.getUsedModules());
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+    }
+
+    @Test
+    public void testUnloadModuleTwice() {
+        assertEquals(Collections.singletonList(MODULE_TYPE_CORE), 
manager.getUsedModules());
+
+        manager.unloadModule(MODULE_TYPE_CORE);
+        assertEquals(Collections.emptyList(), manager.getUsedModules());
+        assertEquals(Collections.emptyMap(), manager.getLoadedModules());
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("No module with name core exists");
+        manager.unloadModule(MODULE_TYPE_CORE);
+    }
+
+    @Test
+    public void testUseUnloadedModules() {
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("No module with name x exists");
+        manager.useModules(MODULE_TYPE_CORE, "x");
+    }
+
+    @Test
+    public void testUseModulesWithDuplicateModuleName() {
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("Module core appears more than once");
+        manager.useModules(MODULE_TYPE_CORE, MODULE_TYPE_CORE);
+    }
+
+    @Test
+    public void testUseModules() {
+        ModuleMock.ModuleX x = new ModuleMock.ModuleX("x");
+        ModuleMock.ModuleY y = new ModuleMock.ModuleY("y");
+        ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z");
+        manager.loadModule(x.getType(), x);
+        manager.loadModule(y.getType(), y);
+        manager.loadModule(z.getType(), z);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), 
manager.getUsedModules());
+
+        // test order for used modules
+        manager.useModules("z", MODULE_TYPE_CORE);
+        assertEquals(Arrays.asList("z", MODULE_TYPE_CORE), 
manager.getUsedModules());
+
+        // test unmentioned modules are still loaded
+        Map<String, Module> expectedLoadedModules = new HashMap<>();
+        expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        expectedLoadedModules.put("x", x);
+        expectedLoadedModules.put("y", y);
+        expectedLoadedModules.put("z", z);
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+    }
+
+    @Test
+    public void testListModules() {
+        ModuleMock.ModuleY y = new ModuleMock.ModuleY("y");
+        ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z");
+        manager.loadModule("y", y);
+        manager.loadModule("z", z);
+        manager.useModules("z", "y");
+
+        assertEquals(Arrays.asList("z", "y"), manager.listModules());
+    }
+
+    @Test
+    public void testListFullModules() {
+        ModuleMock.ModuleZ x = new ModuleMock.ModuleZ("x");
+        ModuleMock.ModuleY y = new ModuleMock.ModuleY("y");
+        ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z");
+
+        manager.loadModule("y", y);
+        manager.loadModule("x", x);
+        manager.loadModule("z", z);
+        manager.useModules("z", "y");
+
+        assertEquals(
+                getExpectedModuleEntries(2, "z", "y", MODULE_TYPE_CORE, "x"),
+                getActualModuleEntries());

Review comment:
       What do you think about making `ModuleManager.loadedModules` to be 
`LinkedHashMap`? This can provide deterministic result for `listFullModules` 
and make the testing easier. 
   
   If we do this, we should add a comment on the `loadedModules` to explain why 
we use `LinkedHashMap`. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
##########
@@ -45,99 +49,149 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ModuleManager.class);
 
-    private LinkedHashMap<String, Module> modules;
+    private Map<String, Module> loadedModules;
+    private List<String> usedModules;
 
     public ModuleManager() {
-        this.modules = new LinkedHashMap<>();
-
-        modules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        this.loadedModules = new HashMap<>();
+        this.usedModules = new ArrayList<>();
+        loadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        usedModules.add(MODULE_TYPE_CORE);
     }
 
     /**
      * Load a module under a unique name. Modules will be kept in the loaded 
order, and new module
-     * will be added to the end. ValidationException is thrown when there is 
already a module with
-     * the same name.
+     * will be added to the left before the unused module and turn on use by 
default.
      *
      * @param name name of the module
      * @param module the module instance
+     * @throws ValidationException when there already exists a module with the 
same name
      */
     public void loadModule(String name, Module module) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(name), "name cannot be 
null or empty string");
         checkNotNull(module, "module cannot be null");
 
-        if (!modules.containsKey(name)) {
-            modules.put(name, module);
-
-            LOG.info("Loaded module {} from class {}", name, 
module.getClass().getName());
-        } else {
+        if (loadedModules.containsKey(name)) {
             throw new ValidationException(
                     String.format("A module with name %s already exists", 
name));
+        } else {
+            usedModules.add(name);
+            loadedModules.put(name, module);
+            LOG.info("Loaded module {} from class {}", name, 
module.getClass().getName());
         }
     }
 
     /**
-     * Unload a module with given name. ValidationException is thrown when 
there is no module with
-     * the given name.
+     * Unload a module with given name.
      *
      * @param name name of the module
+     * @throws ValidationException when there is no module with the given name
      */
     public void unloadModule(String name) {
-        if (modules.containsKey(name)) {
-            modules.remove(name);
-
-            LOG.info("Unloaded module {}", name);
+        if (loadedModules.containsKey(name)) {
+            loadedModules.remove(name);
+            boolean used = usedModules.remove(name);
+            LOG.info("Unloaded an {} module {}", used ? "used" : "unused", 
name);
         } else {
             throw new ValidationException(String.format("No module with name 
%s exists", name));
         }
     }
 
     /**
-     * Get names of all modules loaded.
+     * Enable modules in use with declared name order. Modules that have been 
loaded but not exist
+     * in names varargs will become unused. There is no guarantee on unused 
module's order.
      *
-     * @return a list of names of modules loaded
+     * @param names module names to be used
+     * @throws ValidationException when module names contain an unloaded name
+     */
+    public void useModules(String... names) {
+        checkNotNull(names, "names cannot be null");
+        Set<String> deduplicateNames = new HashSet<>();
+        for (String name : names) {
+            if (!loadedModules.containsKey(name)) {
+                throw new ValidationException(String.format("No module with 
name %s exists", name));
+            }
+            if (!deduplicateNames.add(name)) {
+                throw new ValidationException(
+                        String.format("Module %s appears more than once", 
name));
+            }
+        }
+        usedModules.clear();
+        usedModules.addAll(Arrays.asList(names));
+    }
+
+    /**
+     * Get names of all used modules in most-recent use order.
+     *
+     * @return a list of names of used modules
      */
     public List<String> listModules() {
-        return new ArrayList<>(modules.keySet());
+        return usedModules;

Review comment:
       Return a new ArrayList to avoid the `usedModules` been updated by users. 

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.ModuleMock;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.descriptors.CoreModuleDescriptorValidator.MODULE_TYPE_CORE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ModuleManager}. */
+public class ModuleManagerTest extends TestLogger {
+    private static final Comparator<ModuleEntry> COMPARATOR =
+            Comparator.comparing(ModuleEntry::name);
+    private ModuleManager manager;
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Before
+    public void before() {
+        manager = new ModuleManager();
+    }
+
+    @Test
+    public void testLoadModuleTwice() {
+        // CoreModule is loaded by default
+        assertEquals(Collections.singletonList(MODULE_TYPE_CORE), 
manager.getUsedModules());
+        assertEquals(CoreModule.INSTANCE, 
manager.getLoadedModules().get(MODULE_TYPE_CORE));
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("A module with name core already exists");
+        manager.loadModule(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+    }
+
+    @Test
+    public void testLoadModuleWithoutUnusedModulesExist() {
+        ModuleMock.ModuleX x = new ModuleMock.ModuleX("x");
+        ModuleMock.ModuleY y = new ModuleMock.ModuleY("y");
+        ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z");
+        manager.loadModule(x.getType(), x);
+        manager.loadModule(y.getType(), y);
+        manager.loadModule(z.getType(), z);
+
+        Map<String, Module> expectedLoadedModules = new HashMap<>();
+        expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        expectedLoadedModules.put("x", x);
+        expectedLoadedModules.put("y", y);
+        expectedLoadedModules.put("z", z);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), 
manager.getUsedModules());
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+    }
+
+    @Test
+    public void testLoadModuleWithUnusedModulesExist() {
+        ModuleMock.ModuleY y = new ModuleMock.ModuleY("y");
+        ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z");
+        manager.loadModule(y.getType(), y);
+        manager.loadModule(z.getType(), z);
+
+        Map<String, Module> expectedLoadedModules = new HashMap<>();
+        expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        expectedLoadedModules.put("y", y);
+        expectedLoadedModules.put("z", z);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "y", "z"), 
manager.getUsedModules());
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+
+        // reset usedModules to mock module y and z are disabled
+        manager.getUsedModules().remove("z");
+        manager.getUsedModules().remove("y");

Review comment:
       This is a bit hack. We can use `manager.useModules(core)` to achieve 
this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to