This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 75327e0 [FLINK-21295][table-api] Support 'useModules' and 'listFullModules' in TableEnvironment and ModuleManager 75327e0 is described below commit 75327e0a14c69c739665a34d8d3a7705b0c11d35 Author: Jane <55568005+ladyfor...@users.noreply.github.com> AuthorDate: Fri Feb 12 22:26:43 2021 +0800 [FLINK-21295][table-api] Support 'useModules' and 'listFullModules' in TableEnvironment and ModuleManager This closes #14895 --- .../table/tests/test_environment_completeness.py | 2 + .../apache/flink/table/api/TableEnvironment.java | 21 +- .../table/api/internal/TableEnvironmentImpl.java | 11 ++ .../org/apache/flink/table/module/ModuleEntry.java | 69 +++++++ .../apache/flink/table/module/ModuleManager.java | 138 +++++++++---- .../flink/table/module/ModuleManagerTest.java | 217 +++++++++++++++++++++ .../org/apache/flink/table/utils/ModuleMock.java | 60 ++++++ .../src/test/resources/log4j2-test.properties | 28 +++ .../flink/table/api/internal/TableEnvImpl.scala | 12 +- .../flink/table/utils/MockTableEnvironment.scala | 7 +- 10 files changed, 518 insertions(+), 47 deletions(-) diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py index 7cfd27e..a9fec44 100644 --- a/flink-python/pyflink/table/tests/test_environment_completeness.py +++ b/flink-python/pyflink/table/tests/test_environment_completeness.py @@ -43,6 +43,8 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTest 'create', 'loadModule', 'unloadModule', + 'useModules', + 'listFullModules', 'createTemporarySystemFunction', 'dropTemporarySystemFunction', 'createFunction', diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index c736e69..fd9c1fa 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -28,6 +28,7 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.module.Module; +import org.apache.flink.table.module.ModuleEntry; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.AbstractDataType; @@ -384,6 +385,14 @@ public interface TableEnvironment { void loadModule(String moduleName, Module module); /** + * Enable modules in use with declared name order. Modules that have been loaded but not exist + * in names varargs will become unused. + * + * @param moduleNames module names to be used + */ + void useModules(String... moduleNames); + + /** * Unloads a {@link Module} with given name. ValidationException is thrown when there is no * module with the given name. * @@ -718,13 +727,21 @@ public interface TableEnvironment { String[] listCatalogs(); /** - * Gets an array of names of all modules in this environment in the loaded order. + * Gets an array of names of all used modules in this environment in resolution order. * - * @return A list of the names of all modules in the loaded order. + * @return A list of the names of used modules in resolution order. */ String[] listModules(); /** + * Gets an array of all loaded modules with use status in this environment. Used modules are + * kept in resolution order. + * + * @return A list of name and use status entries of all loaded modules. + */ + ModuleEntry[] listFullModules(); + + /** * Gets the names of all databases registered in the current catalog. * * @return A list of the names of all registered databases in the current catalog. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 806d271..c4d254a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -79,6 +79,7 @@ import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.module.Module; +import org.apache.flink.table.module.ModuleEntry; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogQueryOperation; import org.apache.flink.table.operations.CatalogSinkModifyOperation; @@ -371,6 +372,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } @Override + public void useModules(String... moduleNames) { + moduleManager.useModules(moduleNames); + } + + @Override public void unloadModule(String moduleName) { moduleManager.unloadModule(moduleName); } @@ -540,6 +546,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } @Override + public ModuleEntry[] listFullModules() { + return moduleManager.listFullModules().toArray(new ModuleEntry[0]); + } + + @Override public String[] listDatabases() { return catalogManager .getCatalog(catalogManager.getCurrentCatalog()) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java new file mode 100644 index 0000000..d68cb97 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.module; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** A POJO to represent a module's name and use status. */ +@PublicEvolving +public class ModuleEntry { + private final String name; + private final boolean used; + + public ModuleEntry(String name, boolean used) { + this.name = name; + this.used = used; + } + + public String name() { + return name; + } + + public boolean used() { + return used; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + ModuleEntry entry = (ModuleEntry) o; + + return new EqualsBuilder().append(used, entry.used).append(name, entry.name).isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37).append(name).append(used).toHashCode(); + } + + @Override + public String toString() { + return "ModuleEntry{" + "name='" + name + '\'' + ", used=" + used + '}'; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java index b4bdcd6..b0f5b5f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java @@ -18,6 +18,7 @@ package org.apache.flink.table.module; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.util.StringUtils; @@ -26,6 +27,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -45,99 +49,153 @@ public class ModuleManager { private static final Logger LOG = LoggerFactory.getLogger(ModuleManager.class); - private LinkedHashMap<String, Module> modules; + /** To keep {@link #listFullModules()} deterministic. */ + private LinkedHashMap<String, Module> loadedModules; - public ModuleManager() { - this.modules = new LinkedHashMap<>(); + /** Keep tracking used modules with resolution order. */ + private List<String> usedModules; - modules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + public ModuleManager() { + this.loadedModules = new LinkedHashMap<>(); + this.usedModules = new ArrayList<>(); + loadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + usedModules.add(MODULE_TYPE_CORE); } /** * Load a module under a unique name. Modules will be kept in the loaded order, and new module - * will be added to the end. ValidationException is thrown when there is already a module with - * the same name. + * will be added to the left before the unused module and turn on use by default. * * @param name name of the module * @param module the module instance + * @throws ValidationException when there already exists a module with the same name */ public void loadModule(String name, Module module) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string"); checkNotNull(module, "module cannot be null"); - if (!modules.containsKey(name)) { - modules.put(name, module); - - LOG.info("Loaded module {} from class {}", name, module.getClass().getName()); - } else { + if (loadedModules.containsKey(name)) { throw new ValidationException( - String.format("A module with name %s already exists", name)); + String.format("A module with name '%s' already exists", name)); + } else { + usedModules.add(name); + loadedModules.put(name, module); + LOG.info("Loaded module '{}' from class {}", name, module.getClass().getName()); } } /** - * Unload a module with given name. ValidationException is thrown when there is no module with - * the given name. + * Unload a module with given name. * * @param name name of the module + * @throws ValidationException when there is no module with the given name */ public void unloadModule(String name) { - if (modules.containsKey(name)) { - modules.remove(name); - - LOG.info("Unloaded module {}", name); + if (loadedModules.containsKey(name)) { + loadedModules.remove(name); + boolean used = usedModules.remove(name); + LOG.info("Unloaded an {} module '{}'", used ? "used" : "unused", name); } else { - throw new ValidationException(String.format("No module with name %s exists", name)); + throw new ValidationException(String.format("No module with name '%s' exists", name)); + } + } + + /** + * Enable modules in use with declared name order. Modules that have been loaded but not exist + * in names varargs will become unused. + * + * @param names module names to be used + * @throws ValidationException when module names contain an unloaded name + */ + public void useModules(String... names) { + checkNotNull(names, "names cannot be null"); + Set<String> deduplicateNames = new HashSet<>(); + for (String name : names) { + if (!loadedModules.containsKey(name)) { + throw new ValidationException( + String.format("No module with name '%s' exists", name)); + } + if (!deduplicateNames.add(name)) { + throw new ValidationException( + String.format("Module '%s' appears more than once", name)); + } } + usedModules.clear(); + usedModules.addAll(Arrays.asList(names)); } /** - * Get names of all modules loaded. + * Get names of all used modules in resolution order. * - * @return a list of names of modules loaded + * @return a list of names of used modules */ public List<String> listModules() { - return new ArrayList<>(modules.keySet()); + return new ArrayList<>(usedModules); + } + + /** + * Get all loaded modules with use status. Modules in use status are returned in resolution + * order. + * + * @return a list of module entries with module name and use status + */ + public List<ModuleEntry> listFullModules() { + // keep the order for used modules + List<ModuleEntry> moduleEntries = + usedModules.stream() + .map(name -> new ModuleEntry(name, true)) + .collect(Collectors.toList()); + loadedModules.keySet().stream() + .filter(name -> !usedModules.contains(name)) + .forEach(name -> moduleEntries.add(new ModuleEntry(name, false))); + return moduleEntries; } /** - * Get names of all functions from all modules. + * Get names of all functions from used modules. * - * @return a set of names of registered modules. + * @return a set of function names of used modules */ public Set<String> listFunctions() { - return modules.values().stream() - .map(m -> m.listFunctions()) - .flatMap(n -> n.stream()) + return usedModules.stream() + .map(name -> loadedModules.get(name).listFunctions()) + .flatMap(Collection::stream) .collect(Collectors.toSet()); } /** * Get an optional of {@link FunctionDefinition} by a given name. Function will be resolved to - * modules in the loaded order, and the first match will be returned. If no match is found in - * all modules, return an optional. + * modules in the used order, and the first match will be returned. If no match is found in all + * modules, return an optional. * * @param name name of the function * @return an optional of {@link FunctionDefinition} */ public Optional<FunctionDefinition> getFunctionDefinition(String name) { - Optional<Map.Entry<String, Module>> result = - modules.entrySet().stream() + Optional<String> module = + usedModules.stream() .filter( - p -> - p.getValue().listFunctions().stream() + n -> + loadedModules.get(n).listFunctions().stream() .anyMatch(e -> e.equalsIgnoreCase(name))) .findFirst(); + if (module.isPresent()) { + LOG.debug("Got FunctionDefinition '{}' from '{}' module.", name, module.get()); + return loadedModules.get(module.get()).getFunctionDefinition(name); + } + LOG.debug("Cannot find FunctionDefinition '{}' from any loaded modules.", name); - if (result.isPresent()) { - LOG.debug("Got FunctionDefinition '{}' from '{}' module.", name, result.get().getKey()); + return Optional.empty(); + } - return result.get().getValue().getFunctionDefinition(name); - } else { - LOG.debug("Cannot find FunctionDefinition '{}' from any loaded modules.", name); + @VisibleForTesting + List<String> getUsedModules() { + return usedModules; + } - return Optional.empty(); - } + @VisibleForTesting + Map<String, Module> getLoadedModules() { + return loadedModules; } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java new file mode 100644 index 0000000..189dc03 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.module; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.utils.ModuleMock; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.descriptors.CoreModuleDescriptorValidator.MODULE_TYPE_CORE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link ModuleManager}. */ +public class ModuleManagerTest extends TestLogger { + private ModuleManager manager; + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Before + public void before() { + manager = new ModuleManager(); + } + + @Test + public void testLoadModuleTwice() { + // CoreModule is loaded by default + assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules()); + assertEquals(CoreModule.INSTANCE, manager.getLoadedModules().get(MODULE_TYPE_CORE)); + + thrown.expect(ValidationException.class); + thrown.expectMessage("A module with name 'core' already exists"); + manager.loadModule(MODULE_TYPE_CORE, CoreModule.INSTANCE); + } + + @Test + public void testLoadModuleWithoutUnusedModulesExist() { + ModuleMock x = new ModuleMock("x"); + ModuleMock y = new ModuleMock("y"); + ModuleMock z = new ModuleMock("z"); + manager.loadModule(x.getType(), x); + manager.loadModule(y.getType(), y); + manager.loadModule(z.getType(), z); + + Map<String, Module> expectedLoadedModules = new HashMap<>(); + expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + expectedLoadedModules.put("x", x); + expectedLoadedModules.put("y", y); + expectedLoadedModules.put("z", z); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules()); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + } + + @Test + public void testLoadModuleWithUnusedModulesExist() { + ModuleMock y = new ModuleMock("y"); + ModuleMock z = new ModuleMock("z"); + manager.loadModule(y.getType(), y); + manager.loadModule(z.getType(), z); + + Map<String, Module> expectedLoadedModules = new HashMap<>(); + expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + expectedLoadedModules.put("y", y); + expectedLoadedModules.put("z", z); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "y", "z"), manager.getUsedModules()); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + + // disable module y and z + manager.useModules(MODULE_TYPE_CORE); + + // load module x to test the order + ModuleMock x = new ModuleMock("x"); + manager.loadModule(x.getType(), x); + expectedLoadedModules.put("x", x); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x"), manager.getUsedModules()); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + } + + @Test + public void testUnloadModuleTwice() { + assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules()); + + manager.unloadModule(MODULE_TYPE_CORE); + assertEquals(Collections.emptyList(), manager.getUsedModules()); + assertEquals(Collections.emptyMap(), manager.getLoadedModules()); + + thrown.expect(ValidationException.class); + thrown.expectMessage("No module with name 'core' exists"); + manager.unloadModule(MODULE_TYPE_CORE); + } + + @Test + public void testUseUnloadedModules() { + thrown.expect(ValidationException.class); + thrown.expectMessage("No module with name 'x' exists"); + manager.useModules(MODULE_TYPE_CORE, "x"); + } + + @Test + public void testUseModulesWithDuplicateModuleName() { + thrown.expect(ValidationException.class); + thrown.expectMessage("Module 'core' appears more than once"); + manager.useModules(MODULE_TYPE_CORE, MODULE_TYPE_CORE); + } + + @Test + public void testUseModules() { + ModuleMock x = new ModuleMock("x"); + ModuleMock y = new ModuleMock("y"); + ModuleMock z = new ModuleMock("z"); + manager.loadModule(x.getType(), x); + manager.loadModule(y.getType(), y); + manager.loadModule(z.getType(), z); + + assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules()); + + // test order for used modules + manager.useModules("z", MODULE_TYPE_CORE); + assertEquals(Arrays.asList("z", MODULE_TYPE_CORE), manager.getUsedModules()); + + // test unmentioned modules are still loaded + Map<String, Module> expectedLoadedModules = new HashMap<>(); + expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE); + expectedLoadedModules.put("x", x); + expectedLoadedModules.put("y", y); + expectedLoadedModules.put("z", z); + assertEquals(expectedLoadedModules, manager.getLoadedModules()); + } + + @Test + public void testListModules() { + ModuleMock y = new ModuleMock("y"); + ModuleMock z = new ModuleMock("z"); + manager.loadModule("y", y); + manager.loadModule("z", z); + manager.useModules("z", "y"); + + assertEquals(Arrays.asList("z", "y"), manager.listModules()); + } + + @Test + public void testListFullModules() { + ModuleMock x = new ModuleMock("x"); + ModuleMock y = new ModuleMock("y"); + ModuleMock z = new ModuleMock("z"); + + manager.loadModule("y", y); + manager.loadModule("x", x); + manager.loadModule("z", z); + manager.useModules("z", "y"); + + assertEquals( + getExpectedModuleEntries(2, "z", "y", MODULE_TYPE_CORE, "x"), + manager.listFullModules()); + } + + @Test + public void testListFunctions() { + ModuleMock x = new ModuleMock("x"); + manager.loadModule(x.getType(), x); + + assertTrue(manager.listFunctions().contains("dummy")); + + // should not return function name of an unused module + manager.useModules(MODULE_TYPE_CORE); + assertFalse(manager.listFunctions().contains("dummy")); + } + + @Test + public void testGetFunctionDefinition() { + ModuleMock x = new ModuleMock("x"); + manager.loadModule(x.getType(), x); + + assertTrue(manager.getFunctionDefinition("dummy").isPresent()); + + // should not return function definition of an unused module + manager.useModules(MODULE_TYPE_CORE); + assertFalse(manager.getFunctionDefinition("dummy").isPresent()); + } + + private static List<ModuleEntry> getExpectedModuleEntries(int index, String... names) { + return IntStream.range(0, names.length) + .mapToObj(i -> new ModuleEntry(names[i], i < index)) + .collect(Collectors.toList()); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java new file mode 100644 index 0000000..86f1626 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.module.Module; +import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +/** Mocking {@link Module} for tests. */ +public class ModuleMock implements Module { + private static final Set<String> BUILT_IN_FUNCTIONS = + Collections.unmodifiableSet(new HashSet<>(Collections.singletonList("dummy"))); + private final String type; + private final FunctionDefinitionMock functionDef; + + public ModuleMock(String type) { + this.type = type; + functionDef = new FunctionDefinitionMock(); + functionDef.functionKind = FunctionKind.OTHER; + } + + public String getType() { + return type; + } + + @Override + public Set<String> listFunctions() { + return BUILT_IN_FUNCTIONS; + } + + @Override + public Optional<FunctionDefinition> getFunctionDefinition(String name) { + if (BUILT_IN_FUNCTIONS.contains(name)) { + return Optional.of(functionDef); + } + return Optional.empty(); + } +} diff --git a/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties b/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..8bb9fe6 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index e304e3c..1fa57d2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -34,7 +34,7 @@ import org.apache.flink.table.expressions.resolver.SqlExpressionResolver import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl} import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, _} -import org.apache.flink.table.module.{Module, ModuleManager} +import org.apache.flink.table.module.{Module, ModuleEntry, ModuleManager} import org.apache.flink.table.operations.ddl._ import org.apache.flink.table.operations.utils.OperationTreeBuilder import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _} @@ -45,7 +45,6 @@ import org.apache.flink.table.types.{AbstractDataType, DataType} import org.apache.flink.table.util.JavaScalaConversionUtil import org.apache.flink.table.utils.PrintUtils import org.apache.flink.types.Row - import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.tools.FrameworkConfig @@ -53,7 +52,6 @@ import org.apache.calcite.tools.FrameworkConfig import _root_.java.lang.{Iterable => JIterable, Long => JLong} import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier} import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap} - import _root_.scala.collection.JavaConversions._ import _root_.scala.collection.JavaConverters._ import _root_.scala.util.Try @@ -283,6 +281,10 @@ abstract class TableEnvImpl( moduleManager.loadModule(moduleName, module) } + override def useModules(moduleNames: String*): Unit = { + moduleManager.useModules(moduleNames: _*) + } + override def unloadModule(moduleName: String): Unit = { moduleManager.unloadModule(moduleName) } @@ -461,6 +463,10 @@ abstract class TableEnvImpl( moduleManager.listModules().asScala.toArray } + override def listFullModules(): Array[ModuleEntry] = { + moduleManager.listFullModules().asScala.toArray + } + override def listCatalogs(): Array[String] = { catalogManager.listCatalogs .asScala diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index 0f69ff3..7d6bb4c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -20,14 +20,13 @@ package org.apache.flink.table.utils import java.lang.{Iterable => JIterable} import java.util.Optional - import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.table.api.{ExplainDetail, StatementSet, Table, TableConfig, TableEnvironment, TableResult} import org.apache.flink.table.catalog.Catalog import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor} import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction} -import org.apache.flink.table.module.Module +import org.apache.flink.table.module.{Module, ModuleEntry} import org.apache.flink.table.sources.TableSource import org.apache.flink.table.types.AbstractDataType @@ -47,6 +46,8 @@ class MockTableEnvironment extends TableEnvironment { override def listModules(): Array[String] = ??? + override def listFullModules(): Array[ModuleEntry] = ??? + override def listDatabases(): Array[String] = ??? override def listTables(): Array[String] = ??? @@ -102,6 +103,8 @@ class MockTableEnvironment extends TableEnvironment { override def loadModule(moduleName: String, module: Module): Unit = ??? + override def useModules(moduleNames: String*): Unit = ??? + override def unloadModule(moduleName: String): Unit = ??? override def createTemporaryView(