This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f63e03e831 [FLINK-32691] Make it possible to use builtin functions 
without catalog/db set
3f63e03e831 is described below

commit 3f63e03e83144e9857834f8db1895637d2aa218a
Author: James Hughes <[email protected]>
AuthorDate: Fri Jul 28 07:28:21 2023 -0400

    [FLINK-32691] Make it possible to use builtin functions without catalog/db 
set
    
    This commit makes it possible to use builtin functions without catalog/db 
set by having function lookup skip looking for functions
    in catalog / database specific locations when the catalog and/or database 
is unset.
---
 .../flink/table/catalog/FunctionCatalog.java       | 21 +++++++++----
 .../table/planner/catalog/UnknownCatalogTest.java  | 35 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index b50d47c4865..cbdeed4dfd7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.procedures.Procedure;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -579,6 +580,19 @@ public final class FunctionCatalog {
     }
 
     // 
--------------------------------------------------------------------------------------------
+    private Optional<ContextResolvedFunction> 
resolvePreciseFunctionReference(String funcName) {
+        if 
(StringUtils.isNullOrWhitespaceOnly(catalogManager.getCurrentCatalog())
+                || 
StringUtils.isNullOrWhitespaceOnly(catalogManager.getCurrentDatabase())) {
+            return Optional.empty();
+        } else {
+            ObjectIdentifier oi =
+                    ObjectIdentifier.of(
+                            catalogManager.getCurrentCatalog(),
+                            catalogManager.getCurrentDatabase(),
+                            funcName);
+            return resolvePreciseFunctionReference(oi);
+        }
+    }
 
     private Optional<ContextResolvedFunction> 
resolvePreciseFunctionReference(ObjectIdentifier oi) {
         // resolve order:
@@ -647,11 +661,6 @@ public final class FunctionCatalog {
 
         Optional<FunctionDefinition> candidate =
                 moduleManager.getFunctionDefinition(normalizedName);
-        ObjectIdentifier oi =
-                ObjectIdentifier.of(
-                        catalogManager.getCurrentCatalog(),
-                        catalogManager.getCurrentDatabase(),
-                        funcName);
 
         return candidate
                 .map(
@@ -659,7 +668,7 @@ public final class FunctionCatalog {
                                 Optional.of(
                                         ContextResolvedFunction.permanent(
                                                 
FunctionIdentifier.of(funcName), fd)))
-                .orElseGet(() -> resolvePreciseFunctionReference(oi));
+                .orElseGet(() -> resolvePreciseFunctionReference(funcName));
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
index 250aa35c84f..d28d4bea153 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
@@ -38,6 +38,7 @@ import java.util.Collections;
 
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -55,6 +56,40 @@ public class UnknownCatalogTest {
     public static final ResolvedSchema EXPECTED_SCHEMA =
             ResolvedSchema.of(Column.physical("i", INT()), 
Column.physical("s", STRING()));
 
+    public static final ResolvedSchema CURRENT_TIMESTAMP_EXPECTED_SCHEMA =
+            ResolvedSchema.of(Column.physical("CURRENT_TIMESTAMP", 
TIMESTAMP_LTZ(3).notNull()));
+
+    @Test
+    public void testUnsetCatalogWithSelectCurrentTimestamp() throws Exception {
+        TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);
+
+        tEnv.useCatalog(null);
+        Table table = tEnv.sqlQuery("SELECT CURRENT_TIMESTAMP");
+
+        
assertThat(table.getResolvedSchema()).isEqualTo(CURRENT_TIMESTAMP_EXPECTED_SCHEMA);
+    }
+
+    @Test
+    public void testSetCatalogUnsetDatabaseWithSelectCurrentTimestamp() throws 
Exception {
+        TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);
+
+        tEnv.useCatalog(BUILTIN_CATALOG);
+        tEnv.useDatabase(null);
+        Table table = tEnv.sqlQuery("SELECT CURRENT_TIMESTAMP");
+
+        
assertThat(table.getResolvedSchema()).isEqualTo(CURRENT_TIMESTAMP_EXPECTED_SCHEMA);
+    }
+
+    @Test
+    public void testSetCatalogWithSelectCurrentTimestamp() throws Exception {
+        TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);
+
+        tEnv.useCatalog(BUILTIN_CATALOG);
+        Table table = tEnv.sqlQuery("SELECT CURRENT_TIMESTAMP");
+
+        
assertThat(table.getResolvedSchema()).isEqualTo(CURRENT_TIMESTAMP_EXPECTED_SCHEMA);
+    }
+
     @Test
     public void testUnsetCatalogWithFullyQualified() throws Exception {
         TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);

Reply via email to