This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3f63e03e831 [FLINK-32691] Make it possible to use builtin functions
without catalog/db set
3f63e03e831 is described below
commit 3f63e03e83144e9857834f8db1895637d2aa218a
Author: James Hughes <[email protected]>
AuthorDate: Fri Jul 28 07:28:21 2023 -0400
[FLINK-32691] Make it possible to use builtin functions without catalog/db
set
This commit makes it possible to use builtin functions without catalog/db
set by having function lookup skip looking for functions
in catalog / database specific locations when the catalog and/or database
is unset.
---
.../flink/table/catalog/FunctionCatalog.java | 21 +++++++++----
.../table/planner/catalog/UnknownCatalogTest.java | 35 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index b50d47c4865..cbdeed4dfd7 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -579,6 +580,19 @@ public final class FunctionCatalog {
}
//
--------------------------------------------------------------------------------------------
+ private Optional<ContextResolvedFunction>
resolvePreciseFunctionReference(String funcName) {
+ if
(StringUtils.isNullOrWhitespaceOnly(catalogManager.getCurrentCatalog())
+ ||
StringUtils.isNullOrWhitespaceOnly(catalogManager.getCurrentDatabase())) {
+ return Optional.empty();
+ } else {
+ ObjectIdentifier oi =
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ funcName);
+ return resolvePreciseFunctionReference(oi);
+ }
+ }
private Optional<ContextResolvedFunction>
resolvePreciseFunctionReference(ObjectIdentifier oi) {
// resolve order:
@@ -647,11 +661,6 @@ public final class FunctionCatalog {
Optional<FunctionDefinition> candidate =
moduleManager.getFunctionDefinition(normalizedName);
- ObjectIdentifier oi =
- ObjectIdentifier.of(
- catalogManager.getCurrentCatalog(),
- catalogManager.getCurrentDatabase(),
- funcName);
return candidate
.map(
@@ -659,7 +668,7 @@ public final class FunctionCatalog {
Optional.of(
ContextResolvedFunction.permanent(
FunctionIdentifier.of(funcName), fd)))
- .orElseGet(() -> resolvePreciseFunctionReference(oi));
+ .orElseGet(() -> resolvePreciseFunctionReference(funcName));
}
@SuppressWarnings("unchecked")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
index 250aa35c84f..d28d4bea153 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
@@ -38,6 +38,7 @@ import java.util.Collections;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -55,6 +56,40 @@ public class UnknownCatalogTest {
public static final ResolvedSchema EXPECTED_SCHEMA =
ResolvedSchema.of(Column.physical("i", INT()),
Column.physical("s", STRING()));
+ public static final ResolvedSchema CURRENT_TIMESTAMP_EXPECTED_SCHEMA =
+ ResolvedSchema.of(Column.physical("CURRENT_TIMESTAMP",
TIMESTAMP_LTZ(3).notNull()));
+
+ @Test
+ public void testUnsetCatalogWithSelectCurrentTimestamp() throws Exception {
+ TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);
+
+ tEnv.useCatalog(null);
+ Table table = tEnv.sqlQuery("SELECT CURRENT_TIMESTAMP");
+
+
assertThat(table.getResolvedSchema()).isEqualTo(CURRENT_TIMESTAMP_EXPECTED_SCHEMA);
+ }
+
+ @Test
+ public void testSetCatalogUnsetDatabaseWithSelectCurrentTimestamp() throws
Exception {
+ TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);
+
+ tEnv.useCatalog(BUILTIN_CATALOG);
+ tEnv.useDatabase(null);
+ Table table = tEnv.sqlQuery("SELECT CURRENT_TIMESTAMP");
+
+
assertThat(table.getResolvedSchema()).isEqualTo(CURRENT_TIMESTAMP_EXPECTED_SCHEMA);
+ }
+
+ @Test
+ public void testSetCatalogWithSelectCurrentTimestamp() throws Exception {
+ TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);
+
+ tEnv.useCatalog(BUILTIN_CATALOG);
+ Table table = tEnv.sqlQuery("SELECT CURRENT_TIMESTAMP");
+
+
assertThat(table.getResolvedSchema()).isEqualTo(CURRENT_TIMESTAMP_EXPECTED_SCHEMA);
+ }
+
@Test
public void testUnsetCatalogWithFullyQualified() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);