This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e622205d0b7 [FLINK-34476][table-planner] Consider assignment operator during TVF column expansion e622205d0b7 is described below commit e622205d0b74c6cbaf6fef6d8c11a397cdc30284 Author: Timo Walther <twal...@apache.org> AuthorDate: Thu Feb 22 10:56:36 2024 +0100 [FLINK-34476][table-planner] Consider assignment operator during TVF column expansion --- .../planner/calcite/FlinkCalciteSqlValidator.java | 63 +++++++++++++++------- .../plan/stream/sql/ColumnExpansionTest.java | 28 ++++++++++ 2 files changed, 72 insertions(+), 19 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index a8cd9265e69..f091ab3e70a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -42,6 +42,7 @@ import org.apache.calcite.schema.SchemaVersion; import org.apache.calcite.sql.JoinType; import org.apache.calcite.sql.SqlAsOperator; import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlIdentifier; @@ -85,6 +86,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; import static org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn; @@ -363,20 +365,23 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { final List<SqlIdentifier> descriptors = call.getOperandList().stream() - .filter(op -> op.getKind() == SqlKind.DESCRIPTOR) - .flatMap( - desc -> - ((SqlBasicCall) desc) - .getOperandList().stream() - .filter(SqlIdentifier.class::isInstance) - .map(SqlIdentifier.class::cast)) + .flatMap(FlinkCalciteSqlValidator::extractDescriptors) .collect(Collectors.toList()); for (int i = 0; i < call.operandCount(); i++) { final SqlIdentifier tableArg = explicitTableArgs.get(i); if (tableArg != null) { - call.setOperand(i, new ExplicitTableSqlSelect(tableArg, descriptors)); + final SqlNode opReplacement = new ExplicitTableSqlSelect(tableArg, descriptors); + if (call.operand(i).getKind() == SqlKind.ARGUMENT_ASSIGNMENT) { + // for TUMBLE(DATA => TABLE t3, ...) + final SqlCall assignment = call.operand(i); + assignment.setOperand(0, opReplacement); + } else { + // for TUMBLE(TABLE t3, ...) + call.setOperand(i, opReplacement); + } } + // for TUMBLE([DATA =>] SELECT ..., ...) } } @@ -447,20 +452,40 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { } return call.getOperandList().stream() - .map( - op -> { - if (op.getKind() == SqlKind.EXPLICIT_TABLE) { - final SqlBasicCall opCall = (SqlBasicCall) op; - if (opCall.operandCount() == 1 - && opCall.operand(0) instanceof SqlIdentifier) { - return (SqlIdentifier) opCall.operand(0); - } - } - return null; - }) + .map(FlinkCalciteSqlValidator::extractExplicitTable) .collect(Collectors.toList()); } + private static @Nullable SqlIdentifier extractExplicitTable(SqlNode op) { + if (op.getKind() == SqlKind.EXPLICIT_TABLE) { + final SqlBasicCall opCall = (SqlBasicCall) op; + if (opCall.operandCount() == 1 && opCall.operand(0) instanceof SqlIdentifier) { + // for TUMBLE(TABLE t3, ...) + return opCall.operand(0); + } + } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) { + // for TUMBLE(DATA => TABLE t3, ...) + final SqlBasicCall opCall = (SqlBasicCall) op; + return extractExplicitTable(opCall.operand(0)); + } + return null; + } + + private static Stream<SqlIdentifier> extractDescriptors(SqlNode op) { + if (op.getKind() == SqlKind.DESCRIPTOR) { + // for TUMBLE(..., DESCRIPTOR(col), ...) + final SqlBasicCall opCall = (SqlBasicCall) op; + return opCall.getOperandList().stream() + .filter(SqlIdentifier.class::isInstance) + .map(SqlIdentifier.class::cast); + } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) { + // for TUMBLE(..., TIMECOL => DESCRIPTOR(col), ...) + final SqlBasicCall opCall = (SqlBasicCall) op; + return extractDescriptors(opCall.operand(0)); + } + return Stream.empty(); + } + private static boolean isTableFunction(SqlFunction function) { return function instanceof SqlTableFunction || function.getFunctionType() == SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java index 7062d7ddd4a..f51da37fea8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java @@ -289,4 +289,32 @@ class ColumnExpansionTest { assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames()) .containsExactly(columnNames); } + + @Test + void testExplicitTableWithinTableFunctionWithNamedArgs() { + tableEnv.getConfig() + .set( + TABLE_COLUMN_EXPANSION_STRATEGY, + Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + + // t3_m_virtual is selected due to expansion of the explicit table expression + // with hints from descriptor + assertColumnNames( + "SELECT * FROM TABLE(" + + "TUMBLE(DATA => TABLE t3, TIMECOL => DESCRIPTOR(t3_m_virtual), SIZE => INTERVAL '1' MINUTE))", + "t3_s", + "t3_i", + "t3_m_virtual", + "window_start", + "window_end", + "window_time"); + + // Test common window TVF syntax + assertColumnNames( + "SELECT t3_s, SUM(t3_i) AS agg " + + "FROM TABLE(TUMBLE(DATA => TABLE t3, TIMECOL => DESCRIPTOR(t3_m_virtual), SIZE => INTERVAL '1' MINUTE)) " + + "GROUP BY t3_s, window_start, window_end", + "t3_s", + "agg"); + } }