This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 641fb893a3e [FLINK-27376][table] Support current_database built-in function (#19218) 641fb893a3e is described below commit 641fb893a3e3bc7478413984c3287fcdb56ad845 Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Thu Jul 14 09:44:54 2022 +0800 [FLINK-27376][table] Support current_database built-in function (#19218) --- docs/data/sql_functions.yml | 5 ++++ docs/data/sql_functions_zh.yml | 5 ++++ .../apache/flink/table/module/hive/HiveModule.java | 1 + .../connectors/hive/HiveDialectQueryITCase.java | 17 +++++++++++ .../flink/table/module/hive/HiveModuleTest.java | 4 +-- flink-python/pyflink/table/expressions.py | 12 ++++++-- .../pyflink/table/tests/test_expression.py | 5 ++-- .../org/apache/flink/table/api/Expressions.java | 8 +++++ .../table/api/ImplicitExpressionConversions.scala | 3 ++ .../functions/BuiltInFunctionDefinitions.java | 11 +++++++ .../expressions/converter/DirectConvertRule.java | 5 ++++ .../functions/sql/FlinkSqlOperatorTable.java | 10 +++++++ .../table/planner/utils/InternalConfigOptions.java | 8 +++++ .../planner/codegen/CodeGeneratorContext.scala | 25 ++++++++++++++++ .../planner/codegen/calls/StringCallGen.scala | 6 +++- .../table/planner/delegation/PlannerBase.scala | 6 +++- .../planner/runtime/batch/sql/CalcITCase.scala | 16 ++++++++++ .../planner/runtime/batch/table/CalcITCase.scala | 26 +++++++++++++++++ .../planner/runtime/stream/sql/CalcITCase.scala | 18 ++++++++++++ .../planner/runtime/stream/table/CalcITCase.scala | 34 ++++++++++++++++++++++ 20 files changed, 217 insertions(+), 8 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 24562c01fe3..c3d0f08a9ed 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -1002,3 +1002,8 @@ aggregate: description: Returns the last value in an ordered set of values. - sql: LISTAGG(expression [, separator]) description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','. + +catalog: + - sql: CURRENT_DATABASE() + table: currentDatabase() + description: Return the current database. diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index d7450ae3e82..2f689977248 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -1093,3 +1093,8 @@ aggregate: description: 返回一组有序值中的最后一个值。 - sql: LISTAGG(expression [, separator]) description: 连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。 + +catalog: + - sql: CURRENT_DATABASE() + table: currentDatabase() + description: 返回当前数据库。 diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java index f83649efd29..17285c0876d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java @@ -51,6 +51,7 @@ public class HiveModule implements Module { "cume_dist", "current_date", "current_timestamp", + "current_database", "dense_rank", "first_value", "lag", diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java index 1f1e846568e..af710f24730 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java @@ -361,6 +361,23 @@ public class HiveDialectQueryITCase { } } + @Test + public void testCurrentDatabase() { + List<Row> result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select current_database()").collect()); + assertThat(result.toString()).isEqualTo("[+I[default]]"); + tableEnv.executeSql("create database db1"); + tableEnv.executeSql("use db1"); + result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select current_database()").collect()); + assertThat(result.toString()).isEqualTo("[+I[db1]]"); + // switch to default database for following test use default database + tableEnv.executeSql("use default"); + tableEnv.executeSql("drop database db1"); + } + private void runQFile(File qfile) throws Exception { QTest qTest = extractQTest(qfile); for (int i = 0; i < qTest.statements.size(); i++) { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java index f7933d9dc5d..c8a9b6a4429 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java @@ -74,10 +74,10 @@ public class HiveModuleTest { private void verifyNumBuiltInFunctions(String hiveVersion, HiveModule hiveModule) { switch (hiveVersion) { case HIVE_VERSION_V2_3_9: - assertThat(hiveModule.listFunctions()).hasSize(275); + assertThat(hiveModule.listFunctions()).hasSize(274); break; case HIVE_VERSION_V3_1_1: - assertThat(hiveModule.listFunctions()).hasSize(294); + assertThat(hiveModule.listFunctions()).hasSize(293); break; default: fail("Unknown test version " + hiveVersion); diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py index d062aeecd0f..33114183746 100644 --- a/flink-python/pyflink/table/expressions.py +++ b/flink-python/pyflink/table/expressions.py @@ -25,8 +25,9 @@ from pyflink.table.udf import UserDefinedFunctionWrapper from pyflink.util.java_utils import to_jarray, load_java_class __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'not_', 'UNBOUNDED_ROW', - 'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 'current_time', - 'current_timestamp', 'current_watermark', 'local_time', 'local_timestamp', + 'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_database', + 'current_date', 'current_time', 'current_timestamp', + 'current_watermark', 'local_time', 'local_timestamp', 'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_', 'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_string', @@ -208,6 +209,13 @@ all rows with the same sort key as the current row are included in the window. CURRENT_RANGE = Expression("CURRENT_RANGE") # type: Expression +def current_database() -> Expression: + """ + Returns the current database + """ + return _leaf_op("currentDatabase") + + def current_date() -> Expression: """ Returns the current SQL date in local time zone. diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index c8e1bb2cdd4..4e2ab748365 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -21,8 +21,8 @@ from pyflink.table import DataTypes from pyflink.table.expression import TimeIntervalUnit, TimePointUnit, JsonExistsOnError, \ JsonValueOnEmptyOrError, JsonType, JsonQueryWrapper, JsonQueryOnEmptyOrError from pyflink.table.expressions import (col, lit, range_, and_, or_, current_date, - current_time, current_timestamp, local_time, - local_timestamp, temporal_overlaps, date_format, + current_time, current_timestamp, current_database, + local_timestamp, local_time, temporal_overlaps, date_format, timestamp_diff, array, row, map_, row_interval, pi, e, rand, rand_integer, atan2, negative, concat, concat_ws, uuid, null_of, log, if_then_else, with_columns, call, @@ -245,6 +245,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual('unboundedRange()', str(UNBOUNDED_RANGE)) self.assertEqual('currentRow()', str(CURRENT_ROW)) self.assertEqual('currentRange()', str(CURRENT_RANGE)) + self.assertEqual('currentDatabase()', str(current_database())) self.assertEqual('currentDate()', str(current_date())) self.assertEqual('currentTime()', str(current_time())) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java index 8880a1693a4..29d30858fda 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java @@ -279,6 +279,14 @@ public final class Expressions { return apiCall(BuiltInFunctionDefinitions.CURRENT_WATERMARK, rowtimeAttribute); } + /** + * Return the current database, the return type of this expression is {@link + * DataTypes#STRING()}. + */ + public static ApiExpression currentDatabase() { + return apiCall(BuiltInFunctionDefinitions.CURRENT_DATABASE); + } + /** * Returns the current SQL time in local time zone, the return type of this expression is {@link * DataTypes#TIME()}, this is a synonym for {@link Expressions#currentTime()}. diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala index a1686d4b26e..90261590143 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala @@ -462,6 +462,9 @@ trait ImplicitExpressionConversions { Expressions.currentWatermark(rowtimeAttribute) } + /** Return the current database, the return type of this expression is [[DataTypes.STRING()]]. */ + def currentDatabase(): Expression = Expressions.currentDatabase() + /** * Returns the current SQL time in local time zone, the return type of this expression is * [[DataTypes.TIME]], this is a synonym for [[ImplicitExpressionConversions.currentTime()]]. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 16acd6b8260..7f5810dc2f3 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1382,6 +1382,17 @@ public final class BuiltInFunctionDefinitions { .outputTypeStrategy(nullableIfArgs(SpecificTypeStrategies.ROUND)) .build(); + // -------------------------------------------------------------------------------------------- + // Catalog functions + // -------------------------------------------------------------------------------------------- + + public static final BuiltInFunctionDefinition CURRENT_DATABASE = + BuiltInFunctionDefinition.newBuilder() + .name("currentDatabase") + .kind(SCALAR) + .outputTypeStrategy(explicit(STRING().notNull())) + .build(); + // -------------------------------------------------------------------------------------------- // Time functions // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java index 770a70227ca..85449ea2485 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java @@ -205,6 +205,11 @@ public class DirectConvertRule implements CallExpressionConvertRule { BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, FlinkSqlOperatorTable.TO_TIMESTAMP_LTZ); + // catalog functions + DEFINITION_OPERATOR_MAP.put( + BuiltInFunctionDefinitions.CURRENT_DATABASE, + FlinkSqlOperatorTable.CURRENT_DATABASE); + // collection DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.AT, FlinkSqlOperatorTable.ITEM); DEFINITION_OPERATOR_MAP.put( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 2b18ffee5e3..75001f4960f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -53,6 +53,7 @@ import java.util.List; import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE; import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.STR_MAP_NULLABLE; import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.VARCHAR_FORCE_NULLABLE; +import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.VARCHAR_NOT_NULL; /** Operator table that contains only Flink-specific functions and operators. */ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { @@ -1173,4 +1174,13 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction TUMBLE = new SqlTumbleTableFunction(); public static final SqlFunction HOP = new SqlHopTableFunction(); public static final SqlFunction CUMULATE = new SqlCumulateTableFunction(); + + // Catalog Functions + public static final SqlFunction CURRENT_DATABASE = + BuiltInSqlFunction.newBuilder() + .name("CURRENT_DATABASE") + .returnType(VARCHAR_NOT_NULL) + .operandTypeChecker(OperandTypes.NILADIC) + .notDeterministic() + .build(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java index d1a60ff444e..ab3a61a1da3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java @@ -52,6 +52,14 @@ public final class InternalConfigOptions { + " some temporal functions like LOCAL_TIMESTAMP in batch job to make sure these" + " temporal functions has query-start semantics."); + public static final ConfigOption<String> TABLE_QUERY_CURRENT_DATABASE = + key("__table.query-start.current-database__") + .stringType() + .noDefaultValue() + .withDescription( + "The config used to save the current database at query start." + + " Currently, it's only used for the function CURRENT_DATABASE."); + @Experimental public static final ConfigOption<Boolean> TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED = key("__table.exec.sort.non-temporal.enabled__") diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala index f6f54ac5254..390419c6227 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala @@ -547,6 +547,31 @@ class CodeGeneratorContext(val tableConfig: ReadableConfig, val classLoader: Cla fieldTerm } + /** + * Adds a reusable query-level current database to the beginning of the SAM of the generated + * class. + * + * <p> The current database value is evaluated once at query-start. + */ + def addReusableQueryLevelCurrentDatabase(): String = { + val fieldTerm = s"queryCurrentDatabase" + + val queryStartCurrentDatabase = tableConfig + .getOptional(InternalConfigOptions.TABLE_QUERY_CURRENT_DATABASE) + .orElseThrow(new JSupplier[Throwable] { + override def get() = new CodeGenException( + "Try to obtain current database of query-start fail." + + " This is a bug, please file an issue.") + }) + + reusableMemberStatements.add(s""" + |private static final $BINARY_STRING $fieldTerm = + |$BINARY_STRING.fromString("$queryStartCurrentDatabase"); + |""".stripMargin) + + fieldTerm + } + /** Adds a reusable record-level local time to the beginning of the SAM of the generated class. */ def addReusableRecordLevelLocalTime(): String = { val fieldTerm = s"localTime" diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala index 8a052c0ac53..a589a495dee 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala @@ -21,7 +21,7 @@ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.data.util.DataFormatConverters import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression} import org.apache.flink.table.planner.codegen.CodeGenUtils._ -import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateStringResultCallIfArgsNotNull} +import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateNonNullField, generateStringResultCallIfArgsNotNull} import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._ import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._ import org.apache.flink.table.runtime.functions.SqlFunctionUtils @@ -233,6 +233,10 @@ object StringCallGen { isCharacterString(operands(2).resultType) => methodGen(BuiltInMethods.CONVERT_TZ) + case CURRENT_DATABASE => + val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase() + generateNonNullField(returnType, currentDatabase) + case _ => null } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 401e6bf32da..c5cc1525a4d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -50,7 +50,7 @@ import org.apache.flink.table.planner.plan.reuse.SubplanReuser import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle import org.apache.flink.table.planner.sinks.DataStreamTableSink import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateTableSink} -import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME} +import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_CURRENT_DATABASE, TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME} import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala} import org.apache.flink.table.planner.utils.TableConfigUtils import org.apache.flink.table.runtime.generated.CompileUtils @@ -450,6 +450,9 @@ abstract class PlannerBase( TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)).getOffset(epochTime) tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime) + val currentDatabase = catalogManager.getCurrentDatabase + tableConfig.set(TABLE_QUERY_CURRENT_DATABASE, currentDatabase) + // We pass only the configuration to avoid reconfiguration with the rootConfiguration getExecEnv.configure(tableConfig.getConfiguration, Thread.currentThread().getContextClassLoader) @@ -466,6 +469,7 @@ abstract class PlannerBase( val configuration = tableConfig.getConfiguration configuration.removeConfig(TABLE_QUERY_START_EPOCH_TIME) configuration.removeConfig(TABLE_QUERY_START_LOCAL_TIME) + configuration.removeConfig(TABLE_QUERY_CURRENT_DATABASE) // Clean caches that might have filled up during optimization CompileUtils.cleanUp() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala index dd5a3ca4dbe..d895c028f58 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala @@ -28,6 +28,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException} import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour +import org.apache.flink.table.catalog.CatalogDatabaseImpl import org.apache.flink.table.data.{DecimalDataUtils, TimestampData} import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF} @@ -2098,4 +2099,19 @@ class CalcITCase extends BatchTestBase { Seq(row(1, 1, 2, 1, 3, 4, 1, 1, 2, 1, 3, 4, 1.0, 1.0, 2.0, 2.0, 2.0, null)) ) } + + @Test + def testCurrentDatabase(): Unit = { + checkResult("SELECT CURRENT_DATABASE()", Seq(row(tEnv.getCurrentDatabase))) + // switch to another database + tEnv + .getCatalog(tEnv.getCurrentCatalog) + .get() + .createDatabase( + "db1", + new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"), + false) + tEnv.useDatabase("db1") + checkResult("SELECT CURRENT_DATABASE()", Seq(row(tEnv.getCurrentDatabase))) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala index d0aa026c41c..83b741867b0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.table import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.DataTypes._ +import org.apache.flink.table.catalog.CatalogDatabaseImpl import org.apache.flink.table.data.DecimalDataUtils import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.expressions.utils._ @@ -640,6 +641,31 @@ class CalcITCase extends BatchTestBase { TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testCurrentDatabase(): Unit = { + val result1 = executeQuery( + tEnv + .from("Table3") + .limit(1) + .select(currentDatabase())) + TestBaseUtils.compareResultAsText(result1.asJava, "default_database") + + // switch to another database + tEnv + .getCatalog(tEnv.getCurrentCatalog) + .get() + .createDatabase( + "db1", + new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"), + false) + tEnv.useDatabase("db1") + val result2 = executeQuery( + tEnv + .from("default_database.Table3") + .limit(1) + .select(currentDatabase())) + TestBaseUtils.compareResultAsText(result1.asJava, "default_database") + } } @SerialVersionUID(1L) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 3126c4520de..1381851eb40 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour import org.apache.flink.table.api.internal.TableEnvironmentInternal +import org.apache.flink.table.catalog.CatalogDatabaseImpl import org.apache.flink.table.data.{GenericRowData, MapData, RowData} import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils._ @@ -696,4 +697,21 @@ class CalcITCase extends StreamingTestBase { TestBaseUtils.compareResultAsText(result, "1,1,2,1,3,4,1,1,2,1,3,4,1.0,1.0,2.0,2.0,2.0,null") } + @Test + def testCurrentDatabase(): Unit = { + val result1 = tEnv.sqlQuery("SELECT CURRENT_DATABASE()").execute().collect().toList + assertEquals(Seq(row(tEnv.getCurrentDatabase)), result1) + + // switch to another database + tEnv + .getCatalog(tEnv.getCurrentCatalog) + .get() + .createDatabase( + "db1", + new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"), + false) + tEnv.useDatabase("db1") + val result2 = tEnv.sqlQuery("SELECT CURRENT_DATABASE()").execute().collect().toList + assertEquals(Seq(row(tEnv.getCurrentDatabase)), result2) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala index 0ddc5b39372..4d1141e36f2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala @@ -21,6 +21,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.annotation.{DataTypeHint, InputGroup} import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.catalog.CatalogDatabaseImpl import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.expressions.utils._ import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink, UserDefinedFunctionTestUtils} @@ -657,6 +658,39 @@ class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode val expected = List("0,0,0", "1,1,1", "2,2,2") assertEquals(expected.sorted, sink.getAppendResults.sorted) } + + @Test + def testCurrentDatabase(): Unit = { + val result1 = + env + .fromCollection(tupleData3) + .toTable(tEnv) + .limit(1) + .select(currentDatabase()) + val sink1 = new TestingAppendSink + result1.toAppendStream[Row].addSink(sink1) + env.execute() + assertEquals(List(tEnv.getCurrentDatabase), sink1.getAppendResults.sorted) + + // switch to another database + tEnv + .getCatalog(tEnv.getCurrentCatalog) + .get() + .createDatabase( + "db1", + new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"), + false) + tEnv.useDatabase("db1") + val result2 = env + .fromCollection(tupleData3) + .toTable(tEnv) + .limit(1) + .select(currentDatabase()) + val sink2 = new TestingAppendSink + result2.toAppendStream[Row].addSink(sink2) + env.execute() + assertEquals(List(tEnv.getCurrentDatabase), sink2.getAppendResults.sorted) + } } @SerialVersionUID(1L)