This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b25b57c55d9 [FLINK-33169] Consider descriptor information during 
system column expansion
b25b57c55d9 is described below

commit b25b57c55d903e4fdd2b666de49c90bfbad8fa99
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Sep 29 15:25:17 2023 -0700

    [FLINK-33169] Consider descriptor information during system column expansion
---
 .../planner/calcite/FlinkCalciteSqlValidator.java  | 150 ++++++++++++++++++++-
 .../plan/stream/sql/ColumnExpansionTest.java       |  38 ++++++
 2 files changed, 182 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index c5acdd99f54..0b0075a4f64 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -43,15 +43,20 @@ import org.apache.calcite.sql.JoinType;
 import org.apache.calcite.sql.SqlAsOperator;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlJoin;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlSnapshot;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWindowTableFunction;
+import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.validate.DelegatingScope;
 import org.apache.calcite.sql.validate.IdentifierNamespace;
 import org.apache.calcite.sql.validate.IdentifierSnapshotNamespace;
@@ -67,6 +72,7 @@ import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.util.Static;
 import org.apache.calcite.util.TimestampString;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.PolyNull;
 
 import java.math.BigDecimal;
 import java.time.ZoneId;
@@ -74,8 +80,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
 import static 
org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn;
@@ -95,6 +103,8 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
 
     private final FrameworkConfig frameworkConfig;
 
+    private final List<ColumnExpansionStrategy> columnExpansionStrategies;
+
     public FlinkCalciteSqlValidator(
             SqlOperatorTable opTab,
             SqlValidatorCatalogReader catalogReader,
@@ -107,6 +117,9 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
         this.relOptCluster = relOptCluster;
         this.toRelContext = toRelcontext;
         this.frameworkConfig = frameworkConfig;
+        this.columnExpansionStrategies =
+                ShortcutUtils.unwrapTableConfig(relOptCluster)
+                        
.get(TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY);
     }
 
     public void setExpectedOutputType(SqlNode sqlNode, RelDataType 
expectedOutputType) {
@@ -145,7 +158,7 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
                 SqlNode operand0 = call.operand(0);
                 if (operand0 instanceof SqlBasicCall
                         && ((SqlBasicCall) operand0).getOperator()
-                                instanceof SqlWindowTableFunction) {
+                                instanceof 
org.apache.calcite.sql.SqlWindowTableFunction) {
                     return;
                 }
             }
@@ -295,11 +308,8 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
             SqlNode exp,
             SelectScope scope,
             boolean includeSystemVars) {
-        final List<ColumnExpansionStrategy> strategies =
-                ShortcutUtils.unwrapTableConfig(relOptCluster)
-                        
.get(TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY);
         // Extract column's origin to apply strategy
-        if (!strategies.isEmpty() && exp instanceof SqlIdentifier) {
+        if (!columnExpansionStrategies.isEmpty() && exp instanceof 
SqlIdentifier) {
             final SqlQualified qualified = scope.fullyQualify((SqlIdentifier) 
exp);
             if (qualified.namespace != null && qualified.namespace.getTable() 
!= null) {
                 final CatalogSchemaTable schemaTable =
@@ -309,7 +319,8 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
                 final String columnName = qualified.suffix().get(0);
                 final Column column = 
resolvedSchema.getColumn(columnName).orElse(null);
                 if (qualified.suffix().size() == 1 && column != null) {
-                    if (includeExpandedColumn(column, strategies)) {
+                    if (includeExpandedColumn(column, 
columnExpansionStrategies)
+                            || declaredDescriptorColumn(scope, column)) {
                         super.addToSelectList(
                                 list, aliases, fieldList, exp, scope, 
includeSystemVars);
                     }
@@ -321,4 +332,131 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
         // Always add to list
         super.addToSelectList(list, aliases, fieldList, exp, scope, 
includeSystemVars);
     }
+
+    @Override
+    protected @PolyNull SqlNode performUnconditionalRewrites(
+            @PolyNull SqlNode node, boolean underFrom) {
+
+        // Special case for window TVFs like:
+        // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE))
+        //
+        // "TABLE t" is translated into an implicit "SELECT * FROM t". This 
would ignore columns
+        // that are not expanded by default. However, the descriptor 
explicitly states the need
+        // for this column. Therefore, explicit table expressions (for window 
TVFs at most one)
+        // are captured before rewriting and replaced with a "marker" 
SqlSelect that contains the
+        // descriptor information. The "marker" SqlSelect is considered during 
column expansion.
+        final List<SqlIdentifier> explicitTableArgs = 
getExplicitTableOperands(node);
+
+        final SqlNode rewritten = super.performUnconditionalRewrites(node, 
underFrom);
+
+        if (!(node instanceof SqlBasicCall)) {
+            return rewritten;
+        }
+        final SqlBasicCall call = (SqlBasicCall) node;
+        final SqlOperator operator = call.getOperator();
+
+        if (operator instanceof SqlWindowTableFunction) {
+            if (explicitTableArgs.stream().allMatch(Objects::isNull)) {
+                return rewritten;
+            }
+
+            final List<SqlIdentifier> descriptors =
+                    call.getOperandList().stream()
+                            .filter(op -> op.getKind() == SqlKind.DESCRIPTOR)
+                            .flatMap(
+                                    desc ->
+                                            ((SqlBasicCall) desc)
+                                                    .getOperandList().stream()
+                                                            
.filter(SqlIdentifier.class::isInstance)
+                                                            
.map(SqlIdentifier.class::cast))
+                            .collect(Collectors.toList());
+
+            for (int i = 0; i < call.operandCount(); i++) {
+                final SqlIdentifier tableArg = explicitTableArgs.get(i);
+                if (tableArg != null) {
+                    call.setOperand(i, new ExplicitTableSqlSelect(tableArg, 
descriptors));
+                }
+            }
+        }
+
+        return rewritten;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Column expansion
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * A special {@link SqlSelect} to capture the origin of a {@link 
SqlKind#EXPLICIT_TABLE} within
+     * TVF operands.
+     */
+    private static class ExplicitTableSqlSelect extends SqlSelect {
+
+        private final List<SqlIdentifier> descriptors;
+
+        public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier> 
descriptors) {
+            super(
+                    SqlParserPos.ZERO,
+                    null,
+                    SqlNodeList.of(SqlIdentifier.star(SqlParserPos.ZERO)),
+                    table,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+            this.descriptors = descriptors;
+        }
+    }
+
+    /**
+     * Returns whether the given column has been declared in a {@link 
SqlKind#DESCRIPTOR} next to a
+     * {@link SqlKind#EXPLICIT_TABLE} within TVF operands.
+     */
+    private static boolean declaredDescriptorColumn(SelectScope scope, Column 
column) {
+        if (!(scope.getNode() instanceof ExplicitTableSqlSelect)) {
+            return false;
+        }
+        final ExplicitTableSqlSelect select = (ExplicitTableSqlSelect) 
scope.getNode();
+        return select.descriptors.stream()
+                .map(SqlIdentifier::getSimple)
+                .anyMatch(id -> id.equals(column.getName()));
+    }
+
+    /**
+     * Returns all {@link SqlKind#EXPLICIT_TABLE} operands within TVF 
operands. A list entry is
+     * {@code null} if the operand is not an {@link SqlKind#EXPLICIT_TABLE}.
+     */
+    private static List<SqlIdentifier> getExplicitTableOperands(SqlNode node) {
+        if (!(node instanceof SqlBasicCall)) {
+            return null;
+        }
+        final SqlBasicCall call = (SqlBasicCall) node;
+
+        if (!(call.getOperator() instanceof SqlFunction)) {
+            return null;
+        }
+        final SqlFunction function = (SqlFunction) call.getOperator();
+
+        if (function.getFunctionType() != 
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION) {
+            return null;
+        }
+
+        return call.getOperandList().stream()
+                .map(
+                        op -> {
+                            if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
+                                final SqlBasicCall opCall = (SqlBasicCall) op;
+                                if (opCall.operandCount() == 1
+                                        && opCall.operand(0) instanceof 
SqlIdentifier) {
+                                    return (SqlIdentifier) opCall.operand(0);
+                                }
+                            }
+                            return null;
+                        })
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
index 43cccf6e7f5..290b981583e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
@@ -68,6 +68,17 @@ public class ColumnExpansionTest {
                         + " 'readable-metadata' = 
't2_m_virtual:INT,k1:STRING,t2_m_default:INT,k2:STRING'\n"
                         + ")");
 
+        tableEnv.executeSql(
+                "CREATE TABLE t3 (\n"
+                        + "  t3_s STRING,\n"
+                        + "  t3_i INT,\n"
+                        + "  t3_m_virtual TIMESTAMP_LTZ(3) METADATA VIRTUAL,\n"
+                        + "  WATERMARK FOR t3_m_virtual AS t3_m_virtual - 
INTERVAL '1' SECOND\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'readable-metadata' = 
't3_m_virtual:TIMESTAMP_LTZ(3)'\n"
+                        + ")");
+
         tableEnv.getConfig().set(TABLE_COLUMN_EXPANSION_STRATEGY, 
Collections.emptyList());
     }
 
@@ -222,6 +233,33 @@ public class ColumnExpansionTest {
         assertColumnNames("SELECT * FROM v1", "t1_i", "t1_s", "t1_m_default", 
"t1_m_aliased");
     }
 
+    @Test
+    public void testExplicitTableWithinTableFunction() {
+        tableEnv.getConfig()
+                .set(
+                        TABLE_COLUMN_EXPANSION_STRATEGY,
+                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+
+        // t3_m_virtual is selected due to expansion of the explicit table 
expression
+        // with hints from descriptor
+        assertColumnNames(
+                "SELECT * FROM TABLE(TUMBLE(TABLE t3, 
DESCRIPTOR(t3_m_virtual), INTERVAL '1' MINUTE))",
+                "t3_s",
+                "t3_i",
+                "t3_m_virtual",
+                "window_start",
+                "window_end",
+                "window_time");
+
+        // Test common window TVF syntax
+        assertColumnNames(
+                "SELECT t3_s, SUM(t3_i) AS agg "
+                        + "FROM TABLE(TUMBLE(TABLE t3, 
DESCRIPTOR(t3_m_virtual), INTERVAL '1' MINUTE)) "
+                        + "GROUP BY t3_s, window_start, window_end",
+                "t3_s",
+                "agg");
+    }
+
     private void assertColumnNames(String sql, String... columnNames) {
         assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames())
                 .containsExactly(columnNames);

Reply via email to