This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e622205d0b7 [FLINK-34476][table-planner] Consider assignment operator 
during TVF column expansion
e622205d0b7 is described below

commit e622205d0b74c6cbaf6fef6d8c11a397cdc30284
Author: Timo Walther <twal...@apache.org>
AuthorDate: Thu Feb 22 10:56:36 2024 +0100

    [FLINK-34476][table-planner] Consider assignment operator during TVF column 
expansion
---
 .../planner/calcite/FlinkCalciteSqlValidator.java  | 63 +++++++++++++++-------
 .../plan/stream/sql/ColumnExpansionTest.java       | 28 ++++++++++
 2 files changed, 72 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index a8cd9265e69..f091ab3e70a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -42,6 +42,7 @@ import org.apache.calcite.schema.SchemaVersion;
 import org.apache.calcite.sql.JoinType;
 import org.apache.calcite.sql.SqlAsOperator;
 import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -85,6 +86,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
 import static 
org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn;
@@ -363,20 +365,23 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
 
             final List<SqlIdentifier> descriptors =
                     call.getOperandList().stream()
-                            .filter(op -> op.getKind() == SqlKind.DESCRIPTOR)
-                            .flatMap(
-                                    desc ->
-                                            ((SqlBasicCall) desc)
-                                                    .getOperandList().stream()
-                                                            
.filter(SqlIdentifier.class::isInstance)
-                                                            
.map(SqlIdentifier.class::cast))
+                            
.flatMap(FlinkCalciteSqlValidator::extractDescriptors)
                             .collect(Collectors.toList());
 
             for (int i = 0; i < call.operandCount(); i++) {
                 final SqlIdentifier tableArg = explicitTableArgs.get(i);
                 if (tableArg != null) {
-                    call.setOperand(i, new ExplicitTableSqlSelect(tableArg, 
descriptors));
+                    final SqlNode opReplacement = new 
ExplicitTableSqlSelect(tableArg, descriptors);
+                    if (call.operand(i).getKind() == 
SqlKind.ARGUMENT_ASSIGNMENT) {
+                        // for TUMBLE(DATA => TABLE t3, ...)
+                        final SqlCall assignment = call.operand(i);
+                        assignment.setOperand(0, opReplacement);
+                    } else {
+                        // for TUMBLE(TABLE t3, ...)
+                        call.setOperand(i, opReplacement);
+                    }
                 }
+                // for TUMBLE([DATA =>] SELECT ..., ...)
             }
         }
 
@@ -447,20 +452,40 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
         }
 
         return call.getOperandList().stream()
-                .map(
-                        op -> {
-                            if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
-                                final SqlBasicCall opCall = (SqlBasicCall) op;
-                                if (opCall.operandCount() == 1
-                                        && opCall.operand(0) instanceof 
SqlIdentifier) {
-                                    return (SqlIdentifier) opCall.operand(0);
-                                }
-                            }
-                            return null;
-                        })
+                .map(FlinkCalciteSqlValidator::extractExplicitTable)
                 .collect(Collectors.toList());
     }
 
+    private static @Nullable SqlIdentifier extractExplicitTable(SqlNode op) {
+        if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
+            final SqlBasicCall opCall = (SqlBasicCall) op;
+            if (opCall.operandCount() == 1 && opCall.operand(0) instanceof 
SqlIdentifier) {
+                // for TUMBLE(TABLE t3, ...)
+                return opCall.operand(0);
+            }
+        } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
+            // for TUMBLE(DATA => TABLE t3, ...)
+            final SqlBasicCall opCall = (SqlBasicCall) op;
+            return extractExplicitTable(opCall.operand(0));
+        }
+        return null;
+    }
+
+    private static Stream<SqlIdentifier> extractDescriptors(SqlNode op) {
+        if (op.getKind() == SqlKind.DESCRIPTOR) {
+            // for TUMBLE(..., DESCRIPTOR(col), ...)
+            final SqlBasicCall opCall = (SqlBasicCall) op;
+            return opCall.getOperandList().stream()
+                    .filter(SqlIdentifier.class::isInstance)
+                    .map(SqlIdentifier.class::cast);
+        } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
+            // for TUMBLE(..., TIMECOL => DESCRIPTOR(col), ...)
+            final SqlBasicCall opCall = (SqlBasicCall) op;
+            return extractDescriptors(opCall.operand(0));
+        }
+        return Stream.empty();
+    }
+
     private static boolean isTableFunction(SqlFunction function) {
         return function instanceof SqlTableFunction
                 || function.getFunctionType() == 
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
index 7062d7ddd4a..f51da37fea8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
@@ -289,4 +289,32 @@ class ColumnExpansionTest {
         assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames())
                 .containsExactly(columnNames);
     }
+
+    @Test
+    void testExplicitTableWithinTableFunctionWithNamedArgs() {
+        tableEnv.getConfig()
+                .set(
+                        TABLE_COLUMN_EXPANSION_STRATEGY,
+                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+
+        // t3_m_virtual is selected due to expansion of the explicit table 
expression
+        // with hints from descriptor
+        assertColumnNames(
+                "SELECT * FROM TABLE("
+                        + "TUMBLE(DATA => TABLE t3, TIMECOL => 
DESCRIPTOR(t3_m_virtual), SIZE => INTERVAL '1' MINUTE))",
+                "t3_s",
+                "t3_i",
+                "t3_m_virtual",
+                "window_start",
+                "window_end",
+                "window_time");
+
+        // Test common window TVF syntax
+        assertColumnNames(
+                "SELECT t3_s, SUM(t3_i) AS agg "
+                        + "FROM TABLE(TUMBLE(DATA => TABLE t3, TIMECOL => 
DESCRIPTOR(t3_m_virtual), SIZE => INTERVAL '1' MINUTE)) "
+                        + "GROUP BY t3_s, window_start, window_end",
+                "t3_s",
+                "agg");
+    }
 }

Reply via email to