This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.3 by this push:
new ce0e41e9421 [FLINK-37599][table] Support column expansion for PTF
on_time columns
ce0e41e9421 is described below
commit ce0e41e9421e98f54df59a74e46a9fadb51f2e27
Author: Timo Walther <[email protected]>
AuthorDate: Tue Apr 21 15:51:47 2026 +0200
[FLINK-37599][table] Support column expansion for PTF on_time columns
This closes #27976.
---
.../planner/calcite/FlinkCalciteSqlValidator.java | 184 ++++++++++++++-------
.../flink/table/planner/utils/ShortcutUtils.java | 13 ++
.../plan/stream/sql/ColumnExpansionTest.java | 89 +++++++++-
3 files changed, 216 insertions(+), 70 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index a65cb882a43..77a1634fa0b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -27,9 +27,9 @@ import
org.apache.flink.table.api.config.TableConfigOptions.ColumnExpansionStrat
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.planner.catalog.CatalogSchemaModel;
import org.apache.flink.table.planner.catalog.CatalogSchemaTable;
-import org.apache.flink.table.planner.functions.sql.ml.SqlMLTableFunction;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
@@ -65,6 +65,8 @@ import org.apache.calcite.sql.SqlTableFunction;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindowTableFunction;
import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.DelegatingScope;
import org.apache.calcite.sql.validate.IdentifierNamespace;
@@ -92,7 +94,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
import static
org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn;
@@ -343,7 +344,7 @@ public final class FlinkCalciteSqlValidator extends
FlinkSqlParsingValidator {
final Column column =
resolvedSchema.getColumn(columnName).orElse(null);
if (qualified.suffix().size() == 1 && column != null) {
if (includeExpandedColumn(column,
columnExpansionStrategies)
- || declaredDescriptorColumn(scope, column)) {
+ || isDeclaredOnTimeColumn(scope, column)) {
super.addToSelectList(
list, aliases, fieldList, exp, scope,
includeSystemVars);
}
@@ -360,15 +361,16 @@ public final class FlinkCalciteSqlValidator extends
FlinkSqlParsingValidator {
protected @PolyNull SqlNode performUnconditionalRewrites(
@PolyNull SqlNode node, boolean underFrom) {
- // Special case for window TVFs like:
- // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE))
or
- // SESSION(TABLE t PARTITION BY a, DESCRIPTOR(metadata_virtual),
INTERVAL '1' MINUTE))
+ // Capture table arguments early:
+ // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE)
or
+ // SESSION(TABLE t PARTITION BY a, DESCRIPTOR(metadata_virtual),
INTERVAL '1' MINUTE)
+ // MyPtf(in => TABLE t PARTITION BY a, on_time =>
DESCRIPTOR(metadata_virtual))
//
// "TABLE t" is translated into an implicit "SELECT * FROM t". This
would ignore columns
- // that are not expanded by default. However, the descriptor
explicitly states the need
- // for this column. Therefore, explicit table expressions (for window
TVFs at most one)
- // are captured before rewriting and replaced with a "marker"
SqlSelect that contains the
- // descriptor information. The "marker" SqlSelect is considered during
column expansion.
+ // that are not expanded by default. However, the on_time descriptor
explicitly states the
+ // need for time columns. Therefore, explicit table expressions are
captured before
+ // rewriting and replaced with a "marker" SqlSelect that contains the
descriptor
+ // information. The "marker" SqlSelect is considered during column
expansion.
final List<SqlIdentifier> tableArgs = getTableOperands(node);
final SqlNode rewritten = super.performUnconditionalRewrites(node,
underFrom);
@@ -376,55 +378,54 @@ public final class FlinkCalciteSqlValidator extends
FlinkSqlParsingValidator {
if (!(node instanceof SqlBasicCall)) {
return rewritten;
}
+
final SqlBasicCall call = (SqlBasicCall) node;
- final SqlOperator operator = call.getOperator();
+ // Special case for MODEL
if (node instanceof SqlExplicitModelCall) {
// Convert it so that model can be accessed in planner.
SqlExplicitModelCall
// from parser can't access model.
- SqlExplicitModelCall modelCall = (SqlExplicitModelCall) node;
- SqlIdentifier modelIdentifier = modelCall.getModelIdentifier();
- FlinkCalciteCatalogReader catalogReader =
+ final SqlExplicitModelCall modelCall = (SqlExplicitModelCall) node;
+ final SqlIdentifier modelIdentifier =
modelCall.getModelIdentifier();
+ final FlinkCalciteCatalogReader catalogReader =
(FlinkCalciteCatalogReader) getCatalogReader();
- CatalogSchemaModel model =
catalogReader.getModel(modelIdentifier.names);
+ final CatalogSchemaModel model =
catalogReader.getModel(modelIdentifier.names);
if (model != null) {
return new SqlModelCall(modelCall, model);
}
}
- // TODO (FLINK-37819): add test for SqlMLTableFunction
- if (operator instanceof SqlWindowTableFunction || operator instanceof
SqlMLTableFunction) {
- if (tableArgs.stream().allMatch(Objects::isNull)) {
- return rewritten;
- }
-
- final List<SqlIdentifier> descriptors =
- call.getOperandList().stream()
-
.flatMap(FlinkCalciteSqlValidator::extractDescriptors)
- .collect(Collectors.toList());
-
+ // Mark rewritten "TABLE t" with on_time columns
+ if (tableArgs == null || tableArgs.stream().allMatch(Objects::isNull))
{
+ return rewritten;
+ }
+ final List<SqlIdentifier> onTimeColumns = extractOnTime(call);
+ if (onTimeColumns != null) {
for (int i = 0; i < call.operandCount(); i++) {
final SqlIdentifier tableArg = tableArgs.get(i);
if (tableArg != null) {
- final SqlNode opReplacement = new
ExplicitTableSqlSelect(tableArg, descriptors);
+ final SqlNode opReplacement =
+ new ExplicitTableSqlSelect(tableArg,
onTimeColumns);
+ // for f(TABLE t PARTITION BY c, ...)
if (call.operand(i).getKind() ==
SqlKind.SET_SEMANTICS_TABLE) {
final SqlCall setSemanticsTable = call.operand(i);
setSemanticsTable.setOperand(0, opReplacement);
} else if (call.operand(i).getKind() ==
SqlKind.ARGUMENT_ASSIGNMENT) {
- // for TUMBLE(DATA => TABLE t3, ...)
final SqlCall assignment = call.operand(i);
+ // for f(in => TABLE t PARTITION BY c, ...)
if (assignment.operand(0).getKind() ==
SqlKind.SET_SEMANTICS_TABLE) {
- final SqlCall setSemanticsTable =
assignment.operand(i);
+ final SqlCall setSemanticsTable =
assignment.operand(0);
setSemanticsTable.setOperand(0, opReplacement);
} else {
+ // for f(in => TABLE t, ...)
assignment.setOperand(0, opReplacement);
}
} else {
- // for TUMBLE(TABLE t3, ...)
+ // for f(TABLE t, ...)
call.setOperand(i, opReplacement);
}
}
- // for TUMBLE([DATA =>] SELECT ..., ...)
+ // for f([in =>] SELECT ..., ...)
}
}
@@ -446,9 +447,9 @@ public final class FlinkCalciteSqlValidator extends
FlinkSqlParsingValidator {
*/
static class ExplicitTableSqlSelect extends SqlSelect {
- private final List<SqlIdentifier> descriptors;
+ private final List<SqlIdentifier> onTimeColumns;
- public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier>
descriptors) {
+ public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier>
onTimeColumns) {
super(
SqlParserPos.ZERO,
null,
@@ -462,7 +463,7 @@ public final class FlinkCalciteSqlValidator extends
FlinkSqlParsingValidator {
null,
null,
null);
- this.descriptors = descriptors;
+ this.onTimeColumns = onTimeColumns;
}
}
@@ -470,30 +471,32 @@ public final class FlinkCalciteSqlValidator extends
FlinkSqlParsingValidator {
* Returns whether the given column has been declared in a {@link
SqlKind#DESCRIPTOR} next to a
* {@link SqlKind#EXPLICIT_TABLE} within TVF operands.
*/
- private static boolean declaredDescriptorColumn(SelectScope scope, Column
column) {
+ private static boolean isDeclaredOnTimeColumn(SelectScope scope, Column
column) {
if (!(scope.getNode() instanceof ExplicitTableSqlSelect)) {
return false;
}
final ExplicitTableSqlSelect select = (ExplicitTableSqlSelect)
scope.getNode();
- return select.descriptors.stream()
+ return select.onTimeColumns.stream()
.map(SqlIdentifier::getSimple)
.anyMatch(id -> id.equals(column.getName()));
}
/**
* Returns all {@link SqlKind#EXPLICIT_TABLE} and {@link
SqlKind#SET_SEMANTICS_TABLE} operands
- * within TVF operands. A list entry is {@code null} if the operand is not
an {@link
+ * within PTF operands. A list entry is {@code null} if the operand is not
an {@link
* SqlKind#EXPLICIT_TABLE} or {@link SqlKind#SET_SEMANTICS_TABLE}.
*/
private static List<SqlIdentifier> getTableOperands(SqlNode node) {
if (!(node instanceof SqlBasicCall)) {
return null;
}
+
final SqlBasicCall call = (SqlBasicCall) node;
if (!(call.getOperator() instanceof SqlFunction)) {
return null;
}
+
final SqlFunction function = (SqlFunction) call.getOperator();
if (!isTableFunction(function)) {
@@ -501,52 +504,109 @@ public final class FlinkCalciteSqlValidator extends
FlinkSqlParsingValidator {
}
return call.getOperandList().stream()
- .map(FlinkCalciteSqlValidator::extractTableOperand)
+ .map(FlinkCalciteSqlValidator::extractExplicitTables)
.collect(Collectors.toList());
}
- private static @Nullable SqlIdentifier extractTableOperand(SqlNode op) {
+ /** Extracts "TABLE t" nodes before they get rewritten into "SELECT * FROM
t". */
+ private static @Nullable SqlIdentifier extractExplicitTables(SqlNode op) {
if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
final SqlBasicCall opCall = (SqlBasicCall) op;
if (opCall.operandCount() == 1 && opCall.operand(0) instanceof
SqlIdentifier) {
- // for TUMBLE(TABLE t3, ...)
+ // for f(TABLE t, ...)
return opCall.operand(0);
}
} else if (op.getKind() == SqlKind.SET_SEMANTICS_TABLE) {
- // for SESSION windows
+ // for f(TABLE t PARTITION BY x)
final SqlBasicCall opCall = (SqlBasicCall) op;
- final SqlCall setSemanticsTable = opCall.operand(0);
- if (setSemanticsTable.operand(0) instanceof SqlIdentifier) {
- return setSemanticsTable.operand(0);
- }
+ return extractExplicitTables(opCall.operand(0));
} else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
- // for TUMBLE(DATA => TABLE t3, ...)
+ // for f(in => TABLE t, ...)
final SqlBasicCall opCall = (SqlBasicCall) op;
- return extractTableOperand(opCall.operand(0));
+ return extractExplicitTables(opCall.operand(0));
}
return null;
}
- private static Stream<SqlIdentifier> extractDescriptors(SqlNode op) {
+ /** Extracts the on_time argument of a PTF (or TIMECOL for window PTFs for
legacy reasons). */
+ private static @Nullable List<SqlIdentifier> extractOnTime(SqlBasicCall
call) {
+ // Extract from operand from PTF
+ final SqlNode onTimeOperand;
+ if (call.getOperator() instanceof SqlWindowTableFunction) {
+ onTimeOperand = extractOperandByArgName(call, "TIMECOL");
+ } else if (ShortcutUtils.isFunctionKind(call.getOperator(),
FunctionKind.PROCESS_TABLE)) {
+ onTimeOperand = extractOperandByArgName(call, "on_time");
+ } else {
+ onTimeOperand = null;
+ }
+
+ // No operand found
+ if (onTimeOperand == null) {
+ return null;
+ }
+
+ return extractDescriptors(onTimeOperand);
+ }
+
+ private static List<SqlIdentifier> extractDescriptors(SqlNode op) {
if (op.getKind() == SqlKind.DESCRIPTOR) {
- // for TUMBLE(..., DESCRIPTOR(col), ...)
final SqlBasicCall opCall = (SqlBasicCall) op;
return opCall.getOperandList().stream()
.filter(SqlIdentifier.class::isInstance)
- .map(SqlIdentifier.class::cast);
- } else if (op.getKind() == SqlKind.SET_SEMANTICS_TABLE) {
- // for SESSION windows
- final SqlBasicCall opCall = (SqlBasicCall) op;
- return ((SqlNodeList) opCall.operand(1))
- .stream()
- .filter(SqlIdentifier.class::isInstance)
- .map(SqlIdentifier.class::cast);
- } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
- // for TUMBLE(..., TIMECOL => DESCRIPTOR(col), ...)
- final SqlBasicCall opCall = (SqlBasicCall) op;
- return extractDescriptors(opCall.operand(0));
+ .map(SqlIdentifier.class::cast)
+ .collect(Collectors.toList());
+ }
+ return List.of();
+ }
+
+ /**
+ * Returns the operand for a given argument name from a BasicSqlCall.
Supports both positional
+ * and named arguments. If at least one ARGUMENT_ASSIGNMENT is used, named
lookup is performed.
+ * Otherwise, positional lookup using SqlOperandMetadata is used.
+ *
+ * @param call the SQL call to extract the operand from
+ * @param argumentName the name of the argument to retrieve
+ * @return the SqlNode for the operand, or null if not found or not
supported
+ */
+ private static @Nullable SqlNode extractOperandByArgName(
+ SqlBasicCall call, String argumentName) {
+ // Check if operator supports SqlOperandMetadata
+ final SqlOperator operator = call.getOperator();
+ final SqlOperandTypeChecker typeChecker =
operator.getOperandTypeChecker();
+ if (!(typeChecker instanceof SqlOperandMetadata)) {
+ return null;
+ }
+
+ final SqlOperandMetadata operandMetadata = (SqlOperandMetadata)
typeChecker;
+
+ // Detect if named arguments are used by checking for
ARGUMENT_ASSIGNMENT
+ final List<SqlNode> operands = call.getOperandList();
+ final boolean hasNamedArguments =
+ operands.stream().anyMatch(op -> op.getKind() ==
SqlKind.ARGUMENT_ASSIGNMENT);
+
+ if (hasNamedArguments) {
+ // Named mode: search through ARGUMENT_ASSIGNMENT nodes
+ for (SqlNode operand : operands) {
+ if (operand.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
+ final SqlBasicCall assignment = (SqlBasicCall) operand;
+ // operand(1) contains the parameter name as SqlIdentifier
+ final SqlIdentifier paramName = assignment.operand(1);
+ if (paramName.getSimple().equals(argumentName)) {
+ // operand(0) contains the actual value
+ return assignment.operand(0);
+ }
+ }
+ }
+ return null;
+ } else {
+ // Positional mode: use SqlOperandMetadata to map name to position
+ final List<String> paramNames = operandMetadata.paramNames();
+ final int index = paramNames.indexOf(argumentName);
+ if (index == -1 || index >= call.operandCount()) {
+ return null;
+ }
+ return call.operand(index);
}
- return Stream.empty();
}
private static boolean isTableFunction(SqlFunction function) {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
index 38bc5eb038d..415b78efeac 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
@@ -156,6 +157,18 @@ public final class ShortcutUtils {
return ((BridgingSqlFunction) call.getOperator()).getDefinition();
}
+ public static @Nullable FunctionDefinition
unwrapFunctionDefinition(SqlOperator operator) {
+ if (!(operator instanceof BridgingSqlFunction)) {
+ return null;
+ }
+ return ((BridgingSqlFunction) operator).getDefinition();
+ }
+
+ public static boolean isFunctionKind(SqlOperator operator, FunctionKind
kind) {
+ final FunctionDefinition functionDefinition =
unwrapFunctionDefinition(operator);
+ return functionDefinition != null && functionDefinition.getKind() ==
kind;
+ }
+
public static @Nullable BridgingSqlFunction
unwrapBridgingSqlFunction(RexCall call) {
final SqlOperator operator = call.getOperator();
if (operator instanceof BridgingSqlFunction) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
index 19ea817ba20..79d1c3eacc2 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
@@ -18,20 +18,30 @@
package org.apache.flink.table.planner.plan.stream.sql;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import static
org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
+import static
org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
+import static
org.apache.flink.table.annotation.ArgumentTrait.ROW_SEMANTIC_TABLE;
+import static
org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import static
org.apache.flink.table.api.config.TableConfigOptions.ColumnExpansionStrategy.EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS;
import static
org.apache.flink.table.api.config.TableConfigOptions.ColumnExpansionStrategy.EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS;
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link TableConfigOptions#TABLE_COLUMN_EXPANSION_STRATEGY}. */
class ColumnExpansionTest {
@@ -87,7 +97,7 @@ class ColumnExpansionTest {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
-
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+ List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
// From one table
assertColumnNames(
@@ -158,7 +168,7 @@ class ColumnExpansionTest {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
-
Collections.singletonList(EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS));
+ List.of(EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS));
// From one table
assertColumnNames(
@@ -238,7 +248,7 @@ class ColumnExpansionTest {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
-
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+ List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
// t3_m_virtual is selected due to expansion of the explicit table
expression
// with hints from descriptor
@@ -265,7 +275,7 @@ class ColumnExpansionTest {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
-
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+ List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
// t3_m_virtual is selected due to expansion of the explicit table
expression
// with hints from descriptor
@@ -293,7 +303,7 @@ class ColumnExpansionTest {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
-
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+ List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
tableEnv.executeSql(
"CREATE TABLE sink (\n"
@@ -318,7 +328,7 @@ class ColumnExpansionTest {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
-
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+ List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
tableEnv.executeSql(
"CREATE TABLE sink (\n"
@@ -346,7 +356,7 @@ class ColumnExpansionTest {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
-
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+ List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
// t3_m_virtual is selected due to expansion of the explicit table
expression
// with hints from descriptor
@@ -374,7 +384,7 @@ class ColumnExpansionTest {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
-
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+ List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
// t3_m_virtual is selected due to expansion of the explicit table
expression
// with hints from descriptor
@@ -396,4 +406,67 @@ class ColumnExpansionTest {
"t3_s",
"agg");
}
+
+ @Test
+ void testProcessTableFunctionWithOnTime() {
+ tableEnv.getConfig()
+ .set(
+ TABLE_COLUMN_EXPANSION_STRATEGY,
+ List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+
+ // Register PTF that requires on_time
+ tableEnv.createTemporarySystemFunction("singlePtf",
PassThroughPtf.class);
+ tableEnv.createTemporarySystemFunction("multiPtf",
MultiInputPtf.class);
+
+ // t3_m_virtual is not selected due to missing on_time descriptor
+ assertColumnNames("SELECT * FROM singlePtf(r => TABLE t3)", "t3_s",
"t3_i", "out");
+ assertColumnNames("SELECT * FROM singlePtf(TABLE t3)", "t3_s", "t3_i",
"out");
+
+ // t3_m_virtual is selected due to expansion of the explicit table
expression
+ // with hints from the on_time descriptor
+ assertColumnNames(
+ "SELECT * FROM singlePtf(r => TABLE t3, on_time =>
DESCRIPTOR(t3_m_virtual))",
+ "t3_s",
+ "t3_i",
+ "t3_m_virtual",
+ "out",
+ "rowtime");
+ assertColumnNames(
+ "SELECT * FROM singlePtf(TABLE t3, DESCRIPTOR(t3_m_virtual))",
+ "t3_s",
+ "t3_i",
+ "t3_m_virtual",
+ "out",
+ "rowtime");
+
+ assertThatThrownBy(
+ () ->
+ tableEnv.sqlQuery(
+ "SELECT * FROM multiPtf(TABLE t3,
TABLE t2, DESCRIPTOR(t3_m_virtual, t2_m_virtual))"))
+ // Message indicates that 't2_m_virtual' was correctly resolved
+ // and passed to PTF type inference
+ .hasRootCauseMessage(
+ "Unsupported data type for time attribute. The
`on_time` argument "
+ + "must reference a TIMESTAMP or TIMESTAMP_LTZ
column (up to "
+ + "precision 3). However, column
't2_m_virtual' in table "
+ + "argument 'r2' has data type 'INT'.");
+ }
+
+ @DataTypeHint("ROW<out STRING>")
+ public static class PassThroughPtf extends ProcessTableFunction<Row> {
+ @SuppressWarnings("unused")
+ public void eval(@ArgumentHint({ROW_SEMANTIC_TABLE,
PASS_COLUMNS_THROUGH}) Row r) {
+ // dummy
+ }
+ }
+
+ @DataTypeHint("ROW<out STRING>")
+ public static class MultiInputPtf extends ProcessTableFunction<Row> {
+ @SuppressWarnings("unused")
+ public void eval(
+ @ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row
r1,
+ @ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row
r2) {
+ // dummy
+ }
+ }
}