This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b25b57c55d9 [FLINK-33169] Consider descriptor information during system column expansion b25b57c55d9 is described below commit b25b57c55d903e4fdd2b666de49c90bfbad8fa99 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Sep 29 15:25:17 2023 -0700 [FLINK-33169] Consider descriptor information during system column expansion --- .../planner/calcite/FlinkCalciteSqlValidator.java | 150 ++++++++++++++++++++- .../plan/stream/sql/ColumnExpansionTest.java | 38 ++++++ 2 files changed, 182 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index c5acdd99f54..0b0075a4f64 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -43,15 +43,20 @@ import org.apache.calcite.sql.JoinType; import org.apache.calcite.sql.SqlAsOperator; import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlSnapshot; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWindowTableFunction; +import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.validate.DelegatingScope; import org.apache.calcite.sql.validate.IdentifierNamespace; import org.apache.calcite.sql.validate.IdentifierSnapshotNamespace; @@ -67,6 +72,7 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.util.Static; import org.apache.calcite.util.TimestampString; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.PolyNull; import java.math.BigDecimal; import java.time.ZoneId; @@ -74,8 +80,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; import static org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn; @@ -95,6 +103,8 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { private final FrameworkConfig frameworkConfig; + private final List<ColumnExpansionStrategy> columnExpansionStrategies; + public FlinkCalciteSqlValidator( SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, @@ -107,6 +117,9 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { this.relOptCluster = relOptCluster; this.toRelContext = toRelcontext; this.frameworkConfig = frameworkConfig; + this.columnExpansionStrategies = + ShortcutUtils.unwrapTableConfig(relOptCluster) + .get(TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY); } public void setExpectedOutputType(SqlNode sqlNode, RelDataType expectedOutputType) { @@ -145,7 +158,7 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { SqlNode operand0 = call.operand(0); if (operand0 instanceof SqlBasicCall && ((SqlBasicCall) operand0).getOperator() - instanceof SqlWindowTableFunction) { + instanceof org.apache.calcite.sql.SqlWindowTableFunction) { return; } } @@ -295,11 +308,8 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { SqlNode exp, SelectScope scope, boolean includeSystemVars) { - final List<ColumnExpansionStrategy> strategies = - ShortcutUtils.unwrapTableConfig(relOptCluster) - .get(TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY); // Extract column's origin to apply strategy - if (!strategies.isEmpty() && exp instanceof SqlIdentifier) { + if (!columnExpansionStrategies.isEmpty() && exp instanceof SqlIdentifier) { final SqlQualified qualified = scope.fullyQualify((SqlIdentifier) exp); if (qualified.namespace != null && qualified.namespace.getTable() != null) { final CatalogSchemaTable schemaTable = @@ -309,7 +319,8 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { final String columnName = qualified.suffix().get(0); final Column column = resolvedSchema.getColumn(columnName).orElse(null); if (qualified.suffix().size() == 1 && column != null) { - if (includeExpandedColumn(column, strategies)) { + if (includeExpandedColumn(column, columnExpansionStrategies) + || declaredDescriptorColumn(scope, column)) { super.addToSelectList( list, aliases, fieldList, exp, scope, includeSystemVars); } @@ -321,4 +332,131 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { // Always add to list super.addToSelectList(list, aliases, fieldList, exp, scope, includeSystemVars); } + + @Override + protected @PolyNull SqlNode performUnconditionalRewrites( + @PolyNull SqlNode node, boolean underFrom) { + + // Special case for window TVFs like: + // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE)) + // + // "TABLE t" is translated into an implicit "SELECT * FROM t". This would ignore columns + // that are not expanded by default. However, the descriptor explicitly states the need + // for this column. Therefore, explicit table expressions (for window TVFs at most one) + // are captured before rewriting and replaced with a "marker" SqlSelect that contains the + // descriptor information. The "marker" SqlSelect is considered during column expansion. + final List<SqlIdentifier> explicitTableArgs = getExplicitTableOperands(node); + + final SqlNode rewritten = super.performUnconditionalRewrites(node, underFrom); + + if (!(node instanceof SqlBasicCall)) { + return rewritten; + } + final SqlBasicCall call = (SqlBasicCall) node; + final SqlOperator operator = call.getOperator(); + + if (operator instanceof SqlWindowTableFunction) { + if (explicitTableArgs.stream().allMatch(Objects::isNull)) { + return rewritten; + } + + final List<SqlIdentifier> descriptors = + call.getOperandList().stream() + .filter(op -> op.getKind() == SqlKind.DESCRIPTOR) + .flatMap( + desc -> + ((SqlBasicCall) desc) + .getOperandList().stream() + .filter(SqlIdentifier.class::isInstance) + .map(SqlIdentifier.class::cast)) + .collect(Collectors.toList()); + + for (int i = 0; i < call.operandCount(); i++) { + final SqlIdentifier tableArg = explicitTableArgs.get(i); + if (tableArg != null) { + call.setOperand(i, new ExplicitTableSqlSelect(tableArg, descriptors)); + } + } + } + + return rewritten; + } + + // -------------------------------------------------------------------------------------------- + // Column expansion + // -------------------------------------------------------------------------------------------- + + /** + * A special {@link SqlSelect} to capture the origin of a {@link SqlKind#EXPLICIT_TABLE} within + * TVF operands. + */ + private static class ExplicitTableSqlSelect extends SqlSelect { + + private final List<SqlIdentifier> descriptors; + + public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier> descriptors) { + super( + SqlParserPos.ZERO, + null, + SqlNodeList.of(SqlIdentifier.star(SqlParserPos.ZERO)), + table, + null, + null, + null, + null, + null, + null, + null, + null); + this.descriptors = descriptors; + } + } + + /** + * Returns whether the given column has been declared in a {@link SqlKind#DESCRIPTOR} next to a + * {@link SqlKind#EXPLICIT_TABLE} within TVF operands. + */ + private static boolean declaredDescriptorColumn(SelectScope scope, Column column) { + if (!(scope.getNode() instanceof ExplicitTableSqlSelect)) { + return false; + } + final ExplicitTableSqlSelect select = (ExplicitTableSqlSelect) scope.getNode(); + return select.descriptors.stream() + .map(SqlIdentifier::getSimple) + .anyMatch(id -> id.equals(column.getName())); + } + + /** + * Returns all {@link SqlKind#EXPLICIT_TABLE} operands within TVF operands. A list entry is + * {@code null} if the operand is not an {@link SqlKind#EXPLICIT_TABLE}. + */ + private static List<SqlIdentifier> getExplicitTableOperands(SqlNode node) { + if (!(node instanceof SqlBasicCall)) { + return null; + } + final SqlBasicCall call = (SqlBasicCall) node; + + if (!(call.getOperator() instanceof SqlFunction)) { + return null; + } + final SqlFunction function = (SqlFunction) call.getOperator(); + + if (function.getFunctionType() != SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION) { + return null; + } + + return call.getOperandList().stream() + .map( + op -> { + if (op.getKind() == SqlKind.EXPLICIT_TABLE) { + final SqlBasicCall opCall = (SqlBasicCall) op; + if (opCall.operandCount() == 1 + && opCall.operand(0) instanceof SqlIdentifier) { + return (SqlIdentifier) opCall.operand(0); + } + } + return null; + }) + .collect(Collectors.toList()); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java index 43cccf6e7f5..290b981583e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java @@ -68,6 +68,17 @@ public class ColumnExpansionTest { + " 'readable-metadata' = 't2_m_virtual:INT,k1:STRING,t2_m_default:INT,k2:STRING'\n" + ")"); + tableEnv.executeSql( + "CREATE TABLE t3 (\n" + + " t3_s STRING,\n" + + " t3_i INT,\n" + + " t3_m_virtual TIMESTAMP_LTZ(3) METADATA VIRTUAL,\n" + + " WATERMARK FOR t3_m_virtual AS t3_m_virtual - INTERVAL '1' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'readable-metadata' = 't3_m_virtual:TIMESTAMP_LTZ(3)'\n" + + ")"); + tableEnv.getConfig().set(TABLE_COLUMN_EXPANSION_STRATEGY, Collections.emptyList()); } @@ -222,6 +233,33 @@ public class ColumnExpansionTest { assertColumnNames("SELECT * FROM v1", "t1_i", "t1_s", "t1_m_default", "t1_m_aliased"); } + @Test + public void testExplicitTableWithinTableFunction() { + tableEnv.getConfig() + .set( + TABLE_COLUMN_EXPANSION_STRATEGY, + Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + + // t3_m_virtual is selected due to expansion of the explicit table expression + // with hints from descriptor + assertColumnNames( + "SELECT * FROM TABLE(TUMBLE(TABLE t3, DESCRIPTOR(t3_m_virtual), INTERVAL '1' MINUTE))", + "t3_s", + "t3_i", + "t3_m_virtual", + "window_start", + "window_end", + "window_time"); + + // Test common window TVF syntax + assertColumnNames( + "SELECT t3_s, SUM(t3_i) AS agg " + + "FROM TABLE(TUMBLE(TABLE t3, DESCRIPTOR(t3_m_virtual), INTERVAL '1' MINUTE)) " + + "GROUP BY t3_s, window_start, window_end", + "t3_s", + "agg"); + } + private void assertColumnNames(String sql, String... columnNames) { assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames()) .containsExactly(columnNames);