wuchong commented on a change in pull request #14895: URL: https://github.com/apache/flink/pull/14895#discussion_r573404988
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ########## @@ -718,12 +727,20 @@ void createFunction( String[] listCatalogs(); /** - * Gets an array of names of all modules in this environment in the loaded order. + * Gets an array of names of all used modules in this environment in the most-recent use order. Review comment: Not sure about "most-recent use order", it sounds like LRU that the module been used have higher priority. What about "resolution order"? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.module; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** A POJO to represent a module's name and use status. */ +public class ModuleEntry { Review comment: Add `@PublicEvolving` annotation. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java ########## @@ -45,99 +49,149 @@ private static final Logger LOG = LoggerFactory.getLogger(ModuleManager.class); - private LinkedHashMap<String, Module> modules; + private Map<String, Module> loadedModules; + private List<String> usedModules; public ModuleManager() { - this.modules = new LinkedHashMap<>(); - - modules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + this.loadedModules = new HashMap<>(); + this.usedModules = new ArrayList<>(); + loadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + usedModules.add(MODULE_TYPE_CORE); } /** * Load a module under a unique name. Modules will be kept in the loaded order, and new module - * will be added to the end. ValidationException is thrown when there is already a module with - * the same name. + * will be added to the left before the unused module and turn on use by default. * * @param name name of the module * @param module the module instance + * @throws ValidationException when there already exists a module with the same name */ public void loadModule(String name, Module module) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string"); checkNotNull(module, "module cannot be null"); - if (!modules.containsKey(name)) { - modules.put(name, module); - - LOG.info("Loaded module {} from class {}", name, module.getClass().getName()); - } else { + if (loadedModules.containsKey(name)) { throw new ValidationException( String.format("A module with name %s already exists", name)); + } else { + usedModules.add(name); + loadedModules.put(name, module); + LOG.info("Loaded module {} from class {}", name, module.getClass().getName()); } } /** - * Unload a module with given name. ValidationException is thrown when there is no module with - * the given name. + * Unload a module with given name. * * @param name name of the module + * @throws ValidationException when there is no module with the given name */ public void unloadModule(String name) { - if (modules.containsKey(name)) { - modules.remove(name); - - LOG.info("Unloaded module {}", name); + if (loadedModules.containsKey(name)) { + loadedModules.remove(name); + boolean used = usedModules.remove(name); + LOG.info("Unloaded an {} module {}", used ? "used" : "unused", name); Review comment: Could you add single quotes around the module name to improve the readability? The same to the others including the exception messages. ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.module.Module; +import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +/** Mocking {@link Module} for tests. */ +public abstract class ModuleMock implements Module { + private static final Set<String> BUILT_IN_FUNCTIONS = + Collections.unmodifiableSet(new HashSet<>(Collections.singletonList("dummy"))); + private final String type; + private final FunctionDefinitionMock functionDef; + + public ModuleMock(String type) { + this.type = type; + functionDef = new FunctionDefinitionMock(); + functionDef.functionKind = FunctionKind.OTHER; + } + + public String getType() { + return type; + } + + @Override + public Set<String> listFunctions() { + return BUILT_IN_FUNCTIONS; + } + + @Override + public Optional<FunctionDefinition> getFunctionDefinition(String name) { + if (BUILT_IN_FUNCTIONS.contains(name)) { + return Optional.of(functionDef); + } + return Optional.empty(); + } + + /** A simple mock module with type "x". */ + public static class ModuleX extends ModuleMock { Review comment: Nit: I think we don't need generic types `ModuleX`, `ModuleY`, `ModuleZ`, because the `type` is still manually specified. If we have these generic types, the type should be pre-defined. ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.module; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.utils.ModuleMock; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.descriptors.CoreModuleDescriptorValidator.MODULE_TYPE_CORE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link ModuleManager}. */ +public class ModuleManagerTest extends TestLogger { + private static final Comparator<ModuleEntry> COMPARATOR = + Comparator.comparing(ModuleEntry::name); + private ModuleManager manager; + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Before + public void before() { + manager = new ModuleManager(); + } + + @Test + public void testLoadModuleTwice() { + // CoreModule is loaded by default + assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules()); + assertEquals(CoreModule.INSTANCE, manager.getLoadedModules().get(MODULE_TYPE_CORE)); + + thrown.expect(ValidationException.class); + thrown.expectMessage("A module with name core already exists"); + manager.loadModule(MODULE_TYPE_CORE, CoreModule.INSTANCE); + } + + @Test + public void testLoadModuleWithoutUnusedModulesExist() { + ModuleMock.ModuleX x = new ModuleMock.ModuleX("x"); + ModuleMock.ModuleY y = new ModuleMock.ModuleY("y"); + ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z"); + manager.loadModule(x.getType(), x); + manager.loadModule(y.getType(), y); + manager.loadModule(z.getType(), z); + + Map<String, Module> expectedLoadedModules = new HashMap<>(); + expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + expectedLoadedModules.put("x", x); + expectedLoadedModules.put("y", y); + expectedLoadedModules.put("z", z); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules()); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + } + + @Test + public void testLoadModuleWithUnusedModulesExist() { + ModuleMock.ModuleY y = new ModuleMock.ModuleY("y"); + ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z"); + manager.loadModule(y.getType(), y); + manager.loadModule(z.getType(), z); + + Map<String, Module> expectedLoadedModules = new HashMap<>(); + expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + expectedLoadedModules.put("y", y); + expectedLoadedModules.put("z", z); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "y", "z"), manager.getUsedModules()); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + + // reset usedModules to mock module y and z are disabled + manager.getUsedModules().remove("z"); + manager.getUsedModules().remove("y"); + + // load module x to test the order + ModuleMock.ModuleX x = new ModuleMock.ModuleX("x"); + manager.loadModule(x.getType(), x); + expectedLoadedModules.put("x", x); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x"), manager.getUsedModules()); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + } + + @Test + public void testUnloadModuleTwice() { + assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules()); + + manager.unloadModule(MODULE_TYPE_CORE); + assertEquals(Collections.emptyList(), manager.getUsedModules()); + assertEquals(Collections.emptyMap(), manager.getLoadedModules()); + + thrown.expect(ValidationException.class); + thrown.expectMessage("No module with name core exists"); + manager.unloadModule(MODULE_TYPE_CORE); + } + + @Test + public void testUseUnloadedModules() { + thrown.expect(ValidationException.class); + thrown.expectMessage("No module with name x exists"); + manager.useModules(MODULE_TYPE_CORE, "x"); + } + + @Test + public void testUseModulesWithDuplicateModuleName() { + thrown.expect(ValidationException.class); + thrown.expectMessage("Module core appears more than once"); + manager.useModules(MODULE_TYPE_CORE, MODULE_TYPE_CORE); + } + + @Test + public void testUseModules() { + ModuleMock.ModuleX x = new ModuleMock.ModuleX("x"); + ModuleMock.ModuleY y = new ModuleMock.ModuleY("y"); + ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z"); + manager.loadModule(x.getType(), x); + manager.loadModule(y.getType(), y); + manager.loadModule(z.getType(), z); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules()); + + // test order for used modules + manager.useModules("z", MODULE_TYPE_CORE); + assertEquals(Arrays.asList("z", MODULE_TYPE_CORE), manager.getUsedModules()); + + // test unmentioned modules are still loaded + Map<String, Module> expectedLoadedModules = new HashMap<>(); + expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + expectedLoadedModules.put("x", x); + expectedLoadedModules.put("y", y); + expectedLoadedModules.put("z", z); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + } + + @Test + public void testListModules() { + ModuleMock.ModuleY y = new ModuleMock.ModuleY("y"); + ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z"); + manager.loadModule("y", y); + manager.loadModule("z", z); + manager.useModules("z", "y"); + + assertEquals(Arrays.asList("z", "y"), manager.listModules()); + } + + @Test + public void testListFullModules() { + ModuleMock.ModuleZ x = new ModuleMock.ModuleZ("x"); + ModuleMock.ModuleY y = new ModuleMock.ModuleY("y"); + ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z"); + + manager.loadModule("y", y); + manager.loadModule("x", x); + manager.loadModule("z", z); + manager.useModules("z", "y"); + + assertEquals( + getExpectedModuleEntries(2, "z", "y", MODULE_TYPE_CORE, "x"), + getActualModuleEntries()); Review comment: What do you think about making `ModuleManager.loadedModules` to be `LinkedHashMap`? This can provide deterministic result for `listFullModules` and make the testing easier. If we do this, we should add a comment on the `loadedModules` to explain why we use `LinkedHashMap`. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java ########## @@ -45,99 +49,149 @@ private static final Logger LOG = LoggerFactory.getLogger(ModuleManager.class); - private LinkedHashMap<String, Module> modules; + private Map<String, Module> loadedModules; + private List<String> usedModules; public ModuleManager() { - this.modules = new LinkedHashMap<>(); - - modules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + this.loadedModules = new HashMap<>(); + this.usedModules = new ArrayList<>(); + loadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + usedModules.add(MODULE_TYPE_CORE); } /** * Load a module under a unique name. Modules will be kept in the loaded order, and new module - * will be added to the end. ValidationException is thrown when there is already a module with - * the same name. + * will be added to the left before the unused module and turn on use by default. * * @param name name of the module * @param module the module instance + * @throws ValidationException when there already exists a module with the same name */ public void loadModule(String name, Module module) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string"); checkNotNull(module, "module cannot be null"); - if (!modules.containsKey(name)) { - modules.put(name, module); - - LOG.info("Loaded module {} from class {}", name, module.getClass().getName()); - } else { + if (loadedModules.containsKey(name)) { throw new ValidationException( String.format("A module with name %s already exists", name)); + } else { + usedModules.add(name); + loadedModules.put(name, module); + LOG.info("Loaded module {} from class {}", name, module.getClass().getName()); } } /** - * Unload a module with given name. ValidationException is thrown when there is no module with - * the given name. + * Unload a module with given name. * * @param name name of the module + * @throws ValidationException when there is no module with the given name */ public void unloadModule(String name) { - if (modules.containsKey(name)) { - modules.remove(name); - - LOG.info("Unloaded module {}", name); + if (loadedModules.containsKey(name)) { + loadedModules.remove(name); + boolean used = usedModules.remove(name); + LOG.info("Unloaded an {} module {}", used ? "used" : "unused", name); } else { throw new ValidationException(String.format("No module with name %s exists", name)); } } /** - * Get names of all modules loaded. + * Enable modules in use with declared name order. Modules that have been loaded but not exist + * in names varargs will become unused. There is no guarantee on unused module's order. * - * @return a list of names of modules loaded + * @param names module names to be used + * @throws ValidationException when module names contain an unloaded name + */ + public void useModules(String... names) { + checkNotNull(names, "names cannot be null"); + Set<String> deduplicateNames = new HashSet<>(); + for (String name : names) { + if (!loadedModules.containsKey(name)) { + throw new ValidationException(String.format("No module with name %s exists", name)); + } + if (!deduplicateNames.add(name)) { + throw new ValidationException( + String.format("Module %s appears more than once", name)); + } + } + usedModules.clear(); + usedModules.addAll(Arrays.asList(names)); + } + + /** + * Get names of all used modules in most-recent use order. + * + * @return a list of names of used modules */ public List<String> listModules() { - return new ArrayList<>(modules.keySet()); + return usedModules; Review comment: Return a new ArrayList to avoid the `usedModules` been updated by users. ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.module; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.utils.ModuleMock; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.descriptors.CoreModuleDescriptorValidator.MODULE_TYPE_CORE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link ModuleManager}. */ +public class ModuleManagerTest extends TestLogger { + private static final Comparator<ModuleEntry> COMPARATOR = + Comparator.comparing(ModuleEntry::name); + private ModuleManager manager; + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Before + public void before() { + manager = new ModuleManager(); + } + + @Test + public void testLoadModuleTwice() { + // CoreModule is loaded by default + assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules()); + assertEquals(CoreModule.INSTANCE, manager.getLoadedModules().get(MODULE_TYPE_CORE)); + + thrown.expect(ValidationException.class); + thrown.expectMessage("A module with name core already exists"); + manager.loadModule(MODULE_TYPE_CORE, CoreModule.INSTANCE); + } + + @Test + public void testLoadModuleWithoutUnusedModulesExist() { + ModuleMock.ModuleX x = new ModuleMock.ModuleX("x"); + ModuleMock.ModuleY y = new ModuleMock.ModuleY("y"); + ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z"); + manager.loadModule(x.getType(), x); + manager.loadModule(y.getType(), y); + manager.loadModule(z.getType(), z); + + Map<String, Module> expectedLoadedModules = new HashMap<>(); + expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + expectedLoadedModules.put("x", x); + expectedLoadedModules.put("y", y); + expectedLoadedModules.put("z", z); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules()); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + } + + @Test + public void testLoadModuleWithUnusedModulesExist() { + ModuleMock.ModuleY y = new ModuleMock.ModuleY("y"); + ModuleMock.ModuleZ z = new ModuleMock.ModuleZ("z"); + manager.loadModule(y.getType(), y); + manager.loadModule(z.getType(), z); + + Map<String, Module> expectedLoadedModules = new HashMap<>(); + expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + expectedLoadedModules.put("y", y); + expectedLoadedModules.put("z", z); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "y", "z"), manager.getUsedModules()); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + + // reset usedModules to mock module y and z are disabled + manager.getUsedModules().remove("z"); + manager.getUsedModules().remove("y"); Review comment: This is a bit hack. We can use `manager.useModules(core)` to achieve this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org