This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new ce0e41e9421 [FLINK-37599][table] Support column expansion for PTF 
on_time columns
ce0e41e9421 is described below

commit ce0e41e9421e98f54df59a74e46a9fadb51f2e27
Author: Timo Walther <[email protected]>
AuthorDate: Tue Apr 21 15:51:47 2026 +0200

    [FLINK-37599][table] Support column expansion for PTF on_time columns
    
    This closes #27976.
---
 .../planner/calcite/FlinkCalciteSqlValidator.java  | 184 ++++++++++++++-------
 .../flink/table/planner/utils/ShortcutUtils.java   |  13 ++
 .../plan/stream/sql/ColumnExpansionTest.java       |  89 +++++++++-
 3 files changed, 216 insertions(+), 70 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index a65cb882a43..77a1634fa0b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -27,9 +27,9 @@ import 
org.apache.flink.table.api.config.TableConfigOptions.ColumnExpansionStrat
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.planner.catalog.CatalogSchemaModel;
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable;
-import org.apache.flink.table.planner.functions.sql.ml.SqlMLTableFunction;
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
 import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
@@ -65,6 +65,8 @@ import org.apache.calcite.sql.SqlTableFunction;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWindowTableFunction;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.DelegatingScope;
 import org.apache.calcite.sql.validate.IdentifierNamespace;
@@ -92,7 +94,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
 import static 
org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn;
@@ -343,7 +344,7 @@ public final class FlinkCalciteSqlValidator extends 
FlinkSqlParsingValidator {
                 final Column column = 
resolvedSchema.getColumn(columnName).orElse(null);
                 if (qualified.suffix().size() == 1 && column != null) {
                     if (includeExpandedColumn(column, 
columnExpansionStrategies)
-                            || declaredDescriptorColumn(scope, column)) {
+                            || isDeclaredOnTimeColumn(scope, column)) {
                         super.addToSelectList(
                                 list, aliases, fieldList, exp, scope, 
includeSystemVars);
                     }
@@ -360,15 +361,16 @@ public final class FlinkCalciteSqlValidator extends 
FlinkSqlParsingValidator {
     protected @PolyNull SqlNode performUnconditionalRewrites(
             @PolyNull SqlNode node, boolean underFrom) {
 
-        // Special case for window TVFs like:
-        // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE)) 
or
-        // SESSION(TABLE t PARTITION BY a, DESCRIPTOR(metadata_virtual), 
INTERVAL '1' MINUTE))
+        // Capture table arguments early:
+        // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE) 
or
+        // SESSION(TABLE t PARTITION BY a, DESCRIPTOR(metadata_virtual), 
INTERVAL '1' MINUTE)
+        // MyPtf(in => TABLE t PARTITION BY a, on_time => 
DESCRIPTOR(metadata_virtual))
         //
         // "TABLE t" is translated into an implicit "SELECT * FROM t". This 
would ignore columns
-        // that are not expanded by default. However, the descriptor 
explicitly states the need
-        // for this column. Therefore, explicit table expressions (for window 
TVFs at most one)
-        // are captured before rewriting and replaced with a "marker" 
SqlSelect that contains the
-        // descriptor information. The "marker" SqlSelect is considered during 
column expansion.
+        // that are not expanded by default. However, the on_time descriptor 
explicitly states the
+        // need for time columns. Therefore, explicit table expressions are 
captured before
+        // rewriting and replaced with a "marker" SqlSelect that contains the 
descriptor
+        // information. The "marker" SqlSelect is considered during column 
expansion.
         final List<SqlIdentifier> tableArgs = getTableOperands(node);
 
         final SqlNode rewritten = super.performUnconditionalRewrites(node, 
underFrom);
@@ -376,55 +378,54 @@ public final class FlinkCalciteSqlValidator extends 
FlinkSqlParsingValidator {
         if (!(node instanceof SqlBasicCall)) {
             return rewritten;
         }
+
         final SqlBasicCall call = (SqlBasicCall) node;
-        final SqlOperator operator = call.getOperator();
 
+        // Special case for MODEL
         if (node instanceof SqlExplicitModelCall) {
             // Convert it so that model can be accessed in planner. 
SqlExplicitModelCall
             // from parser can't access model.
-            SqlExplicitModelCall modelCall = (SqlExplicitModelCall) node;
-            SqlIdentifier modelIdentifier = modelCall.getModelIdentifier();
-            FlinkCalciteCatalogReader catalogReader =
+            final SqlExplicitModelCall modelCall = (SqlExplicitModelCall) node;
+            final SqlIdentifier modelIdentifier = 
modelCall.getModelIdentifier();
+            final FlinkCalciteCatalogReader catalogReader =
                     (FlinkCalciteCatalogReader) getCatalogReader();
-            CatalogSchemaModel model = 
catalogReader.getModel(modelIdentifier.names);
+            final CatalogSchemaModel model = 
catalogReader.getModel(modelIdentifier.names);
             if (model != null) {
                 return new SqlModelCall(modelCall, model);
             }
         }
 
-        // TODO (FLINK-37819): add test for SqlMLTableFunction
-        if (operator instanceof SqlWindowTableFunction || operator instanceof 
SqlMLTableFunction) {
-            if (tableArgs.stream().allMatch(Objects::isNull)) {
-                return rewritten;
-            }
-
-            final List<SqlIdentifier> descriptors =
-                    call.getOperandList().stream()
-                            
.flatMap(FlinkCalciteSqlValidator::extractDescriptors)
-                            .collect(Collectors.toList());
-
+        // Mark rewritten "TABLE t" with on_time columns
+        if (tableArgs == null || tableArgs.stream().allMatch(Objects::isNull)) 
{
+            return rewritten;
+        }
+        final List<SqlIdentifier> onTimeColumns = extractOnTime(call);
+        if (onTimeColumns != null) {
             for (int i = 0; i < call.operandCount(); i++) {
                 final SqlIdentifier tableArg = tableArgs.get(i);
                 if (tableArg != null) {
-                    final SqlNode opReplacement = new 
ExplicitTableSqlSelect(tableArg, descriptors);
+                    final SqlNode opReplacement =
+                            new ExplicitTableSqlSelect(tableArg, 
onTimeColumns);
+                    // for f(TABLE t PARTITION BY c, ...)
                     if (call.operand(i).getKind() == 
SqlKind.SET_SEMANTICS_TABLE) {
                         final SqlCall setSemanticsTable = call.operand(i);
                         setSemanticsTable.setOperand(0, opReplacement);
                     } else if (call.operand(i).getKind() == 
SqlKind.ARGUMENT_ASSIGNMENT) {
-                        // for TUMBLE(DATA => TABLE t3, ...)
                         final SqlCall assignment = call.operand(i);
+                        // for f(in => TABLE t PARTITION BY c, ...)
                         if (assignment.operand(0).getKind() == 
SqlKind.SET_SEMANTICS_TABLE) {
-                            final SqlCall setSemanticsTable = 
assignment.operand(i);
+                            final SqlCall setSemanticsTable = 
assignment.operand(0);
                             setSemanticsTable.setOperand(0, opReplacement);
                         } else {
+                            // for f(in => TABLE t, ...)
                             assignment.setOperand(0, opReplacement);
                         }
                     } else {
-                        // for TUMBLE(TABLE t3, ...)
+                        // for f(TABLE t, ...)
                         call.setOperand(i, opReplacement);
                     }
                 }
-                // for TUMBLE([DATA =>] SELECT ..., ...)
+                // for f([in =>] SELECT ..., ...)
             }
         }
 
@@ -446,9 +447,9 @@ public final class FlinkCalciteSqlValidator extends 
FlinkSqlParsingValidator {
      */
     static class ExplicitTableSqlSelect extends SqlSelect {
 
-        private final List<SqlIdentifier> descriptors;
+        private final List<SqlIdentifier> onTimeColumns;
 
-        public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier> 
descriptors) {
+        public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier> 
onTimeColumns) {
             super(
                     SqlParserPos.ZERO,
                     null,
@@ -462,7 +463,7 @@ public final class FlinkCalciteSqlValidator extends 
FlinkSqlParsingValidator {
                     null,
                     null,
                     null);
-            this.descriptors = descriptors;
+            this.onTimeColumns = onTimeColumns;
         }
     }
 
@@ -470,30 +471,32 @@ public final class FlinkCalciteSqlValidator extends 
FlinkSqlParsingValidator {
      * Returns whether the given column has been declared in a {@link 
SqlKind#DESCRIPTOR} next to a
      * {@link SqlKind#EXPLICIT_TABLE} within TVF operands.
      */
-    private static boolean declaredDescriptorColumn(SelectScope scope, Column 
column) {
+    private static boolean isDeclaredOnTimeColumn(SelectScope scope, Column 
column) {
         if (!(scope.getNode() instanceof ExplicitTableSqlSelect)) {
             return false;
         }
         final ExplicitTableSqlSelect select = (ExplicitTableSqlSelect) 
scope.getNode();
-        return select.descriptors.stream()
+        return select.onTimeColumns.stream()
                 .map(SqlIdentifier::getSimple)
                 .anyMatch(id -> id.equals(column.getName()));
     }
 
     /**
      * Returns all {@link SqlKind#EXPLICIT_TABLE} and {@link 
SqlKind#SET_SEMANTICS_TABLE} operands
-     * within TVF operands. A list entry is {@code null} if the operand is not 
an {@link
+     * within PTF operands. A list entry is {@code null} if the operand is not 
an {@link
      * SqlKind#EXPLICIT_TABLE} or {@link SqlKind#SET_SEMANTICS_TABLE}.
      */
     private static List<SqlIdentifier> getTableOperands(SqlNode node) {
         if (!(node instanceof SqlBasicCall)) {
             return null;
         }
+
         final SqlBasicCall call = (SqlBasicCall) node;
 
         if (!(call.getOperator() instanceof SqlFunction)) {
             return null;
         }
+
         final SqlFunction function = (SqlFunction) call.getOperator();
 
         if (!isTableFunction(function)) {
@@ -501,52 +504,109 @@ public final class FlinkCalciteSqlValidator extends 
FlinkSqlParsingValidator {
         }
 
         return call.getOperandList().stream()
-                .map(FlinkCalciteSqlValidator::extractTableOperand)
+                .map(FlinkCalciteSqlValidator::extractExplicitTables)
                 .collect(Collectors.toList());
     }
 
-    private static @Nullable SqlIdentifier extractTableOperand(SqlNode op) {
+    /** Extracts "TABLE t" nodes before they get rewritten into "SELECT * FROM 
t". */
+    private static @Nullable SqlIdentifier extractExplicitTables(SqlNode op) {
         if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
             final SqlBasicCall opCall = (SqlBasicCall) op;
             if (opCall.operandCount() == 1 && opCall.operand(0) instanceof 
SqlIdentifier) {
-                // for TUMBLE(TABLE t3, ...)
+                // for f(TABLE t, ...)
                 return opCall.operand(0);
             }
         } else if (op.getKind() == SqlKind.SET_SEMANTICS_TABLE) {
-            // for SESSION windows
+            // for f(TABLE t PARTITION BY x)
             final SqlBasicCall opCall = (SqlBasicCall) op;
-            final SqlCall setSemanticsTable = opCall.operand(0);
-            if (setSemanticsTable.operand(0) instanceof SqlIdentifier) {
-                return setSemanticsTable.operand(0);
-            }
+            return extractExplicitTables(opCall.operand(0));
         } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
-            // for TUMBLE(DATA => TABLE t3, ...)
+            // for f(in => TABLE t, ...)
             final SqlBasicCall opCall = (SqlBasicCall) op;
-            return extractTableOperand(opCall.operand(0));
+            return extractExplicitTables(opCall.operand(0));
         }
         return null;
     }
 
-    private static Stream<SqlIdentifier> extractDescriptors(SqlNode op) {
+    /** Extracts the on_time argument of a PTF (or TIMECOL for window PTFs for 
legacy reasons). */
+    private static @Nullable List<SqlIdentifier> extractOnTime(SqlBasicCall 
call) {
+        // Extract from operand from PTF
+        final SqlNode onTimeOperand;
+        if (call.getOperator() instanceof SqlWindowTableFunction) {
+            onTimeOperand = extractOperandByArgName(call, "TIMECOL");
+        } else if (ShortcutUtils.isFunctionKind(call.getOperator(), 
FunctionKind.PROCESS_TABLE)) {
+            onTimeOperand = extractOperandByArgName(call, "on_time");
+        } else {
+            onTimeOperand = null;
+        }
+
+        // No operand found
+        if (onTimeOperand == null) {
+            return null;
+        }
+
+        return extractDescriptors(onTimeOperand);
+    }
+
+    private static List<SqlIdentifier> extractDescriptors(SqlNode op) {
         if (op.getKind() == SqlKind.DESCRIPTOR) {
-            // for TUMBLE(..., DESCRIPTOR(col), ...)
             final SqlBasicCall opCall = (SqlBasicCall) op;
             return opCall.getOperandList().stream()
                     .filter(SqlIdentifier.class::isInstance)
-                    .map(SqlIdentifier.class::cast);
-        } else if (op.getKind() == SqlKind.SET_SEMANTICS_TABLE) {
-            // for SESSION windows
-            final SqlBasicCall opCall = (SqlBasicCall) op;
-            return ((SqlNodeList) opCall.operand(1))
-                    .stream()
-                            .filter(SqlIdentifier.class::isInstance)
-                            .map(SqlIdentifier.class::cast);
-        } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
-            // for TUMBLE(..., TIMECOL => DESCRIPTOR(col), ...)
-            final SqlBasicCall opCall = (SqlBasicCall) op;
-            return extractDescriptors(opCall.operand(0));
+                    .map(SqlIdentifier.class::cast)
+                    .collect(Collectors.toList());
+        }
+        return List.of();
+    }
+
+    /**
+     * Returns the operand for a given argument name from a BasicSqlCall. 
Supports both positional
+     * and named arguments. If at least one ARGUMENT_ASSIGNMENT is used, named 
lookup is performed.
+     * Otherwise, positional lookup using SqlOperandMetadata is used.
+     *
+     * @param call the SQL call to extract the operand from
+     * @param argumentName the name of the argument to retrieve
+     * @return the SqlNode for the operand, or null if not found or not 
supported
+     */
+    private static @Nullable SqlNode extractOperandByArgName(
+            SqlBasicCall call, String argumentName) {
+        // Check if operator supports SqlOperandMetadata
+        final SqlOperator operator = call.getOperator();
+        final SqlOperandTypeChecker typeChecker = 
operator.getOperandTypeChecker();
+        if (!(typeChecker instanceof SqlOperandMetadata)) {
+            return null;
+        }
+
+        final SqlOperandMetadata operandMetadata = (SqlOperandMetadata) 
typeChecker;
+
+        // Detect if named arguments are used by checking for 
ARGUMENT_ASSIGNMENT
+        final List<SqlNode> operands = call.getOperandList();
+        final boolean hasNamedArguments =
+                operands.stream().anyMatch(op -> op.getKind() == 
SqlKind.ARGUMENT_ASSIGNMENT);
+
+        if (hasNamedArguments) {
+            // Named mode: search through ARGUMENT_ASSIGNMENT nodes
+            for (SqlNode operand : operands) {
+                if (operand.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
+                    final SqlBasicCall assignment = (SqlBasicCall) operand;
+                    // operand(1) contains the parameter name as SqlIdentifier
+                    final SqlIdentifier paramName = assignment.operand(1);
+                    if (paramName.getSimple().equals(argumentName)) {
+                        // operand(0) contains the actual value
+                        return assignment.operand(0);
+                    }
+                }
+            }
+            return null;
+        } else {
+            // Positional mode: use SqlOperandMetadata to map name to position
+            final List<String> paramNames = operandMetadata.paramNames();
+            final int index = paramNames.indexOf(argumentName);
+            if (index == -1 || index >= call.operandCount()) {
+                return null;
+            }
+            return call.operand(index);
         }
-        return Stream.empty();
     }
 
     private static boolean isTableFunction(SqlFunction function) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
index 38bc5eb038d..415b78efeac 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.delegation.PlannerBase;
@@ -156,6 +157,18 @@ public final class ShortcutUtils {
         return ((BridgingSqlFunction) call.getOperator()).getDefinition();
     }
 
+    public static @Nullable FunctionDefinition 
unwrapFunctionDefinition(SqlOperator operator) {
+        if (!(operator instanceof BridgingSqlFunction)) {
+            return null;
+        }
+        return ((BridgingSqlFunction) operator).getDefinition();
+    }
+
+    public static boolean isFunctionKind(SqlOperator operator, FunctionKind 
kind) {
+        final FunctionDefinition functionDefinition = 
unwrapFunctionDefinition(operator);
+        return functionDefinition != null && functionDefinition.getKind() == 
kind;
+    }
+
     public static @Nullable BridgingSqlFunction 
unwrapBridgingSqlFunction(RexCall call) {
         final SqlOperator operator = call.getOperator();
         if (operator instanceof BridgingSqlFunction) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
index 19ea817ba20..79d1c3eacc2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
@@ -18,20 +18,30 @@
 
 package org.apache.flink.table.planner.plan.stream.sql;
 
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
+import static 
org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
+import static 
org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
+import static 
org.apache.flink.table.annotation.ArgumentTrait.ROW_SEMANTIC_TABLE;
+import static 
org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.ColumnExpansionStrategy.EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.ColumnExpansionStrategy.EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link TableConfigOptions#TABLE_COLUMN_EXPANSION_STRATEGY}. */
 class ColumnExpansionTest {
@@ -87,7 +97,7 @@ class ColumnExpansionTest {
         tableEnv.getConfig()
                 .set(
                         TABLE_COLUMN_EXPANSION_STRATEGY,
-                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+                        List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
 
         // From one table
         assertColumnNames(
@@ -158,7 +168,7 @@ class ColumnExpansionTest {
         tableEnv.getConfig()
                 .set(
                         TABLE_COLUMN_EXPANSION_STRATEGY,
-                        
Collections.singletonList(EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS));
+                        List.of(EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS));
 
         // From one table
         assertColumnNames(
@@ -238,7 +248,7 @@ class ColumnExpansionTest {
         tableEnv.getConfig()
                 .set(
                         TABLE_COLUMN_EXPANSION_STRATEGY,
-                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+                        List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
 
         // t3_m_virtual is selected due to expansion of the explicit table 
expression
         // with hints from descriptor
@@ -265,7 +275,7 @@ class ColumnExpansionTest {
         tableEnv.getConfig()
                 .set(
                         TABLE_COLUMN_EXPANSION_STRATEGY,
-                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+                        List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
 
         // t3_m_virtual is selected due to expansion of the explicit table 
expression
         // with hints from descriptor
@@ -293,7 +303,7 @@ class ColumnExpansionTest {
         tableEnv.getConfig()
                 .set(
                         TABLE_COLUMN_EXPANSION_STRATEGY,
-                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+                        List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
 
         tableEnv.executeSql(
                 "CREATE TABLE sink (\n"
@@ -318,7 +328,7 @@ class ColumnExpansionTest {
         tableEnv.getConfig()
                 .set(
                         TABLE_COLUMN_EXPANSION_STRATEGY,
-                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+                        List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
 
         tableEnv.executeSql(
                 "CREATE TABLE sink (\n"
@@ -346,7 +356,7 @@ class ColumnExpansionTest {
         tableEnv.getConfig()
                 .set(
                         TABLE_COLUMN_EXPANSION_STRATEGY,
-                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+                        List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
 
         // t3_m_virtual is selected due to expansion of the explicit table 
expression
         // with hints from descriptor
@@ -374,7 +384,7 @@ class ColumnExpansionTest {
         tableEnv.getConfig()
                 .set(
                         TABLE_COLUMN_EXPANSION_STRATEGY,
-                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+                        List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
 
         // t3_m_virtual is selected due to expansion of the explicit table 
expression
         // with hints from descriptor
@@ -396,4 +406,67 @@ class ColumnExpansionTest {
                 "t3_s",
                 "agg");
     }
+
+    @Test
+    void testProcessTableFunctionWithOnTime() {
+        tableEnv.getConfig()
+                .set(
+                        TABLE_COLUMN_EXPANSION_STRATEGY,
+                        List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+
+        // Register PTF that requires on_time
+        tableEnv.createTemporarySystemFunction("singlePtf", 
PassThroughPtf.class);
+        tableEnv.createTemporarySystemFunction("multiPtf", 
MultiInputPtf.class);
+
+        // t3_m_virtual is not selected due to missing on_time descriptor
+        assertColumnNames("SELECT * FROM singlePtf(r => TABLE t3)", "t3_s", 
"t3_i", "out");
+        assertColumnNames("SELECT * FROM singlePtf(TABLE t3)", "t3_s", "t3_i", 
"out");
+
+        // t3_m_virtual is selected due to expansion of the explicit table 
expression
+        // with hints from the on_time descriptor
+        assertColumnNames(
+                "SELECT * FROM singlePtf(r => TABLE t3, on_time => 
DESCRIPTOR(t3_m_virtual))",
+                "t3_s",
+                "t3_i",
+                "t3_m_virtual",
+                "out",
+                "rowtime");
+        assertColumnNames(
+                "SELECT * FROM singlePtf(TABLE t3, DESCRIPTOR(t3_m_virtual))",
+                "t3_s",
+                "t3_i",
+                "t3_m_virtual",
+                "out",
+                "rowtime");
+
+        assertThatThrownBy(
+                        () ->
+                                tableEnv.sqlQuery(
+                                        "SELECT * FROM multiPtf(TABLE t3, 
TABLE t2, DESCRIPTOR(t3_m_virtual, t2_m_virtual))"))
+                // Message indicates that 't2_m_virtual' was correctly resolved
+                // and passed to PTF type inference
+                .hasRootCauseMessage(
+                        "Unsupported data type for time attribute. The 
`on_time` argument "
+                                + "must reference a TIMESTAMP or TIMESTAMP_LTZ 
column (up to "
+                                + "precision 3). However, column 
't2_m_virtual' in table "
+                                + "argument 'r2' has data type 'INT'.");
+    }
+
+    @DataTypeHint("ROW<out STRING>")
+    public static class PassThroughPtf extends ProcessTableFunction<Row> {
+        @SuppressWarnings("unused")
+        public void eval(@ArgumentHint({ROW_SEMANTIC_TABLE, 
PASS_COLUMNS_THROUGH}) Row r) {
+            // dummy
+        }
+    }
+
+    @DataTypeHint("ROW<out STRING>")
+    public static class MultiInputPtf extends ProcessTableFunction<Row> {
+        @SuppressWarnings("unused")
+        public void eval(
+                @ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row 
r1,
+                @ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row 
r2) {
+            // dummy
+        }
+    }
 }

Reply via email to