This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 03c8185090e [FLINK-36639] Use table aliases in Table API 
asSerializableString (#25597)
03c8185090e is described below

commit 03c8185090ef7f91b525315ff828afdd97925c0d
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Mon Nov 4 09:11:01 2024 +0100

    [FLINK-36639] Use table aliases in Table API asSerializableString (#25597)
---
 .../table/operations/AggregateQueryOperation.java  |  21 ++-
 .../table/operations/CalculatedQueryOperation.java |   7 +-
 .../table/operations/DistinctQueryOperation.java   |   8 +-
 .../table/operations/FilterQueryOperation.java     |  10 +-
 .../flink/table/operations/JoinQueryOperation.java |  29 +++-
 .../flink/table/operations/OperationUtils.java     |  15 +-
 .../table/operations/ProjectQueryOperation.java    |   7 +-
 .../flink/table/operations/SetQueryOperation.java  |   2 +-
 .../flink/table/operations/SortQueryOperation.java |  13 +-
 .../table/operations/SourceQueryOperation.java     |  12 +-
 .../table/operations/ValuesQueryOperation.java     |   9 +-
 .../operations/WindowAggregateQueryOperation.java  |  14 +-
 .../utils/OperationExpressionsUtils.java           |  54 ++++++
 .../expressions/FieldReferenceExpression.java      |  26 ++-
 .../table/api/QueryOperationSqlExecutionTest.java  |   3 +-
 .../api/QueryOperationSqlSerializationTest.java    |   3 +-
 .../table/api/QueryOperationTestPrograms.java      | 190 +++++++++++++++------
 17 files changed, 328 insertions(+), 95 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
index c1e6d3a5479..9dcdaee104d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -35,6 +36,7 @@ import java.util.stream.Stream;
 @Internal
 public class AggregateQueryOperation implements QueryOperation {
 
+    private static final String INPUT_ALIAS = "$$T_AGG";
     private final List<ResolvedExpression> groupingExpressions;
     private final List<ResolvedExpression> aggregateExpressions;
     private final QueryOperation child;
@@ -78,11 +80,16 @@ public class AggregateQueryOperation implements 
QueryOperation {
     public String asSerializableString() {
         final String groupingExprs = getGroupingExprs();
         return String.format(
-                "SELECT %s FROM (%s\n)\nGROUP BY %s",
+                "SELECT %s FROM (%s\n) %s\nGROUP BY %s",
                 Stream.concat(groupingExpressions.stream(), 
aggregateExpressions.stream())
+                        .map(
+                                expr ->
+                                        
OperationExpressionsUtils.scopeReferencesWithAlias(
+                                                INPUT_ALIAS, expr))
                         .map(ResolvedExpression::asSerializableString)
                         .collect(Collectors.joining(", ")),
                 OperationUtils.indent(child.asSerializableString()),
+                INPUT_ALIAS,
                 groupingExprs);
     }
 
@@ -90,11 +97,13 @@ public class AggregateQueryOperation implements 
QueryOperation {
         if (groupingExpressions.isEmpty()) {
             return "1";
         } else {
-            final String groupingExprs =
-                    groupingExpressions.stream()
-                            .map(ResolvedExpression::asSerializableString)
-                            .collect(Collectors.joining(", "));
-            return groupingExprs;
+            return groupingExpressions.stream()
+                    .map(
+                            expr ->
+                                    
OperationExpressionsUtils.scopeReferencesWithAlias(
+                                            INPUT_ALIAS, expr))
+                    .map(ResolvedExpression::asSerializableString)
+                    .collect(Collectors.joining(", "));
         }
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
index f4bc5ee4bcc..8b264fa3b61 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
@@ -33,6 +33,8 @@ import java.util.Map;
 @Internal
 public class CalculatedQueryOperation implements QueryOperation {
 
+    public static final String INPUT_ALIAS = "$$T_LAT";
+
     private final ContextResolvedFunction resolvedFunction;
     private final List<ResolvedExpression> arguments;
     private final ResolvedSchema resolvedSchema;
@@ -74,11 +76,12 @@ public class CalculatedQueryOperation implements 
QueryOperation {
         // if we ever add multi-way join in JoinQueryOperation we need to sort 
out uniqueness of the
         // table name
         return String.format(
-                "LATERAL TABLE(%s) T$0(%s)",
+                "LATERAL TABLE(%s) %s(%s)",
                 resolvedFunction
                         .toCallExpression(arguments, 
resolvedSchema.toPhysicalRowDataType())
                         .asSerializableString(),
-                OperationUtils.formatSelectColumns(resolvedSchema));
+                INPUT_ALIAS,
+                OperationUtils.formatSelectColumns(resolvedSchema, null));
     }
 
     @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java
index 96f42d5016f..69995d280a8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java
@@ -28,6 +28,7 @@ import java.util.List;
 @Internal
 public class DistinctQueryOperation implements QueryOperation {
 
+    private static final String INPUT_ALIAS = "$$T_DISTINCT";
     private final QueryOperation child;
 
     public DistinctQueryOperation(QueryOperation child) {
@@ -48,9 +49,10 @@ public class DistinctQueryOperation implements 
QueryOperation {
     @Override
     public String asSerializableString() {
         return String.format(
-                "SELECT DISTINCT %s FROM (%s\n)",
-                OperationUtils.formatSelectColumns(getResolvedSchema()),
-                OperationUtils.indent(child.asSerializableString()));
+                "SELECT DISTINCT %s FROM (%s\n) %s",
+                OperationUtils.formatSelectColumns(getResolvedSchema(), 
INPUT_ALIAS),
+                OperationUtils.indent(child.asSerializableString()),
+                INPUT_ALIAS);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/FilterQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/FilterQueryOperation.java
index 97ffb91bbc0..14639b91295 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/FilterQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/FilterQueryOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@ import java.util.Map;
 @Internal
 public class FilterQueryOperation implements QueryOperation {
 
+    private static final String INPUT_ALIAS = "$$T_FILTER";
     private final ResolvedExpression condition;
     private final QueryOperation child;
 
@@ -60,10 +62,12 @@ public class FilterQueryOperation implements QueryOperation 
{
     @Override
     public String asSerializableString() {
         return String.format(
-                "SELECT %s FROM (%s\n) WHERE %s",
-                OperationUtils.formatSelectColumns(getResolvedSchema()),
+                "SELECT %s FROM (%s\n) %s WHERE %s",
+                OperationUtils.formatSelectColumns(getResolvedSchema(), 
INPUT_ALIAS),
                 OperationUtils.indent(child.asSerializableString()),
-                condition.asSerializableString());
+                INPUT_ALIAS,
+                
OperationExpressionsUtils.scopeReferencesWithAlias(INPUT_ALIAS, condition)
+                        .asSerializableString());
     }
 
     @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java
index bd139ca3d10..7c785a29009 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java
@@ -21,8 +21,10 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +35,8 @@ import java.util.stream.Stream;
 @Internal
 public class JoinQueryOperation implements QueryOperation {
 
+    private static final String INPUT_1_ALIAS = "$$T1_JOIN";
+    private static final String INPUT_2_ALIAS = "$$T2_JOIN";
     private final QueryOperation left;
     private final QueryOperation right;
     private final JoinType joinType;
@@ -108,13 +112,30 @@ public class JoinQueryOperation implements QueryOperation 
{
 
     @Override
     public String asSerializableString() {
+
+        Map<Integer, String> inputAliases = new HashMap<>();
+        inputAliases.put(0, INPUT_1_ALIAS);
+        inputAliases.put(1, correlated ? CalculatedQueryOperation.INPUT_ALIAS 
: INPUT_2_ALIAS);
+
         return String.format(
-                "SELECT %s FROM (%s\n) %s JOIN %s ON %s",
-                OperationUtils.formatSelectColumns(resolvedSchema),
+                "SELECT %s FROM (%s\n) %s %s JOIN %s ON %s",
+                getSelectList(),
                 OperationUtils.indent(left.asSerializableString()),
+                INPUT_1_ALIAS,
                 joinType.toString().replaceAll("_", " "),
                 rightToSerializable(),
-                condition.asSerializableString());
+                
OperationExpressionsUtils.scopeReferencesWithAlias(inputAliases, condition)
+                        .asSerializableString());
+    }
+
+    private String getSelectList() {
+        String leftColumns =
+                OperationUtils.formatSelectColumns(left.getResolvedSchema(), 
INPUT_1_ALIAS);
+        String rightColumns =
+                OperationUtils.formatSelectColumns(
+                        right.getResolvedSchema(),
+                        correlated ? CalculatedQueryOperation.INPUT_ALIAS : 
INPUT_2_ALIAS);
+        return leftColumns + ", " + rightColumns;
     }
 
     private String rightToSerializable() {
@@ -125,6 +146,8 @@ public class JoinQueryOperation implements QueryOperation {
         s.append(OperationUtils.indent(right.asSerializableString()));
         if (!correlated) {
             s.append("\n)");
+            s.append(" ");
+            s.append(INPUT_2_ALIAS);
         }
         return s.toString();
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
index ab9a8675f88..2455e4a344c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.util.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -118,9 +120,18 @@ public class OperationUtils {
         return stringBuilder.append(childrenDescription).toString();
     }
 
-    public static String formatSelectColumns(ResolvedSchema schema) {
+    public static String formatSelectColumns(ResolvedSchema schema, @Nullable 
String inputAlias) {
         return schema.getColumnNames().stream()
-                .map(EncodingUtils::escapeIdentifier)
+                .map(
+                        i -> {
+                            if (inputAlias == null) {
+                                return EncodingUtils.escapeIdentifier(i);
+                            }
+                            return String.format(
+                                    "%s.%s",
+                                    EncodingUtils.escapeIdentifier(inputAlias),
+                                    EncodingUtils.escapeIdentifier(i));
+                        })
                 .collect(Collectors.joining(", "));
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ProjectQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ProjectQueryOperation.java
index f5359043fb9..b2ebae350c7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ProjectQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ProjectQueryOperation.java
@@ -42,6 +42,7 @@ import java.util.stream.IntStream;
 @Internal
 public class ProjectQueryOperation implements QueryOperation {
 
+    private static final String INPUT_ALIAS = "$$T_PROJECT";
     private final List<ResolvedExpression> projectList;
     private final QueryOperation child;
     private final ResolvedSchema resolvedSchema;
@@ -76,9 +77,13 @@ public class ProjectQueryOperation implements QueryOperation 
{
     @Override
     public String asSerializableString() {
         return String.format(
-                "SELECT %s FROM (%s\n)",
+                "SELECT %s FROM (%s\n) " + INPUT_ALIAS,
                 IntStream.range(0, projectList.size())
                         .mapToObj(this::alias)
+                        .map(
+                                expr ->
+                                        
OperationExpressionsUtils.scopeReferencesWithAlias(
+                                                INPUT_ALIAS, expr))
                         .map(ResolvedExpression::asSerializableString)
                         .collect(Collectors.joining(", ")),
                 OperationUtils.indent(child.asSerializableString()));
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SetQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SetQueryOperation.java
index c43cc7ca1ef..dbbae5c0398 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SetQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SetQueryOperation.java
@@ -88,7 +88,7 @@ public class SetQueryOperation implements QueryOperation {
     public String asSerializableString() {
         return String.format(
                 "SELECT %s FROM (%s\n) %s (%s\n)",
-                OperationUtils.formatSelectColumns(resolvedSchema),
+                OperationUtils.formatSelectColumns(resolvedSchema, null),
                 OperationUtils.indent(leftOperation.asSerializableString()),
                 asSerializableType(),
                 OperationUtils.indent(rightOperation.asSerializableString()));
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
index d447bdf58ac..3575f90350b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -35,6 +36,7 @@ import java.util.stream.Collectors;
 @Internal
 public class SortQueryOperation implements QueryOperation {
 
+    private static final String INPUT_ALIAS = "$$T_SORT";
     private final List<ResolvedExpression> order;
     private final QueryOperation child;
     private final int offset;
@@ -89,10 +91,17 @@ public class SortQueryOperation implements QueryOperation {
         final StringBuilder s =
                 new StringBuilder(
                         String.format(
-                                "SELECT %s FROM (%s\n) ORDER BY %s",
-                                
OperationUtils.formatSelectColumns(getResolvedSchema()),
+                                "SELECT %s FROM (%s\n) %s ORDER BY %s",
+                                OperationUtils.formatSelectColumns(
+                                        getResolvedSchema(), INPUT_ALIAS),
                                 
OperationUtils.indent(child.asSerializableString()),
+                                INPUT_ALIAS,
                                 order.stream()
+                                        .map(
+                                                expr ->
+                                                        
OperationExpressionsUtils
+                                                                
.scopeReferencesWithAlias(
+                                                                        
INPUT_ALIAS, expr))
                                         
.map(ResolvedExpression::asSerializableString)
                                         .collect(Collectors.joining(", "))));
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
index 926c79c52e3..203e5b47109 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.utils.EncodingUtils;
 
 import javax.annotation.Nullable;
 
@@ -31,7 +30,6 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
  * Describes a query operation from a {@link ContextResolvedTable}.
@@ -43,6 +41,7 @@ import java.util.stream.Collectors;
 @Internal
 public class SourceQueryOperation implements QueryOperation {
 
+    private static final String INPUT_ALIAS = "$$T_SOURCE";
     private final ContextResolvedTable contextResolvedTable;
     private final @Nullable Map<String, String> dynamicOptions;
 
@@ -83,11 +82,10 @@ public class SourceQueryOperation implements QueryOperation 
{
     public String asSerializableString() {
         String s =
                 String.format(
-                        "SELECT %s FROM %s",
-                        getResolvedSchema().getColumnNames().stream()
-                                .map(EncodingUtils::escapeIdentifier)
-                                .collect(Collectors.joining(", ")),
-                        
getContextResolvedTable().getIdentifier().asSerializableString());
+                        "SELECT %s FROM %s %s",
+                        
OperationUtils.formatSelectColumns(getResolvedSchema(), INPUT_ALIAS),
+                        
getContextResolvedTable().getIdentifier().asSerializableString(),
+                        INPUT_ALIAS);
 
         if (dynamicOptions != null && !dynamicOptions.isEmpty()) {
             throw new TableException("Dynamic source options are not SQL 
serializable yet.");
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java
index 3400529296f..39849cd77f5 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java
@@ -37,6 +37,7 @@ import java.util.stream.Collectors;
 @Internal
 public class ValuesQueryOperation implements QueryOperation {
 
+    private static final String INPUT_ALIAS = "$$T_VAL";
     private final List<List<ResolvedExpression>> values;
     private final ResolvedSchema resolvedSchema;
 
@@ -66,10 +67,9 @@ public class ValuesQueryOperation implements QueryOperation {
 
     @Override
     public String asSerializableString() {
-        final String selectColumns = 
OperationUtils.formatSelectColumns(resolvedSchema);
         return String.format(
-                "SELECT %s FROM (VALUES %s\n) VAL$0(%s)",
-                selectColumns,
+                "SELECT %s FROM (VALUES %s\n) %s(%s)",
+                OperationUtils.formatSelectColumns(resolvedSchema, 
INPUT_ALIAS),
                 OperationUtils.indent(
                         values.stream()
                                 .map(
@@ -81,7 +81,8 @@ public class ValuesQueryOperation implements QueryOperation {
                                                         .collect(
                                                                 
Collectors.joining(", ", "(", ")")))
                                 .collect(Collectors.joining(",\n"))),
-                selectColumns);
+                INPUT_ALIAS,
+                OperationUtils.formatSelectColumns(resolvedSchema, null));
     }
 
     @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java
index 8eea2d4b9cb..0b13fb6ddcb 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
 import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
@@ -50,6 +51,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class WindowAggregateQueryOperation implements QueryOperation {
 
+    private static final String INPUT_ALIAS = "$$T_WIN_AGG";
     private final List<ResolvedExpression> groupingExpressions;
     private final List<ResolvedExpression> aggregateExpressions;
     private final List<ResolvedExpression> windowPropertiesExpressions;
@@ -92,19 +94,29 @@ public class WindowAggregateQueryOperation implements 
QueryOperation {
     @Override
     public String asSerializableString() {
         return String.format(
-                "SELECT %s FROM TABLE(%s\n) GROUP BY %s",
+                "SELECT %s FROM TABLE(%s\n) %s GROUP BY %s",
                 Stream.of(
                                 groupingExpressions.stream(),
                                 aggregateExpressions.stream(),
                                 windowPropertiesExpressions.stream())
                         .flatMap(Function.identity())
+                        .map(
+                                expr ->
+                                        
OperationExpressionsUtils.scopeReferencesWithAlias(
+                                                INPUT_ALIAS, expr))
                         .map(ResolvedExpression::asSerializableString)
                         .collect(Collectors.joining(", ")),
                 OperationUtils.indent(
                         
groupWindow.asSerializableString(child.asSerializableString())),
+                INPUT_ALIAS,
                 Stream.concat(
                                 Stream.of("window_start", "window_end"),
                                 groupingExpressions.stream()
+                                        .map(
+                                                expr ->
+                                                        
OperationExpressionsUtils
+                                                                
.scopeReferencesWithAlias(
+                                                                        
INPUT_ALIAS, expr))
                                         
.map(ResolvedExpression::asSerializableString))
                         .collect(Collectors.joining(", ")));
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
index 20342f0d746..4d7af03afa8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
@@ -28,10 +28,12 @@ import 
org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
+import 
org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.operations.QueryOperation;
 
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -287,5 +289,57 @@ public class OperationExpressionsUtils {
         }
     }
 
+    /**
+     * Adds an input alias to all {@link FieldReferenceExpression} in the 
given {@code expression}.
+     */
+    public static ResolvedExpression scopeReferencesWithAlias(
+            final String aliasName, final ResolvedExpression expression) {
+        return expression.accept(
+                new TableReferenceScopingVisitor(Collections.singletonMap(0, 
aliasName)));
+    }
+
+    /**
+     * Adds an input alias to all {@link FieldReferenceExpression} in the 
given {@code expression}.
+     * This method accepts multiple aliases for given input indices.
+     */
+    public static ResolvedExpression scopeReferencesWithAlias(
+            final Map<Integer, String> inputAliases, final ResolvedExpression 
expression) {
+        return expression.accept(new 
TableReferenceScopingVisitor(inputAliases));
+    }
+
+    private static class TableReferenceScopingVisitor
+            extends ResolvedExpressionDefaultVisitor<ResolvedExpression> {
+
+        private final Map<Integer, String> inputAliases;
+
+        private TableReferenceScopingVisitor(Map<Integer, String> 
inputAliases) {
+            this.inputAliases = inputAliases;
+        }
+
+        @Override
+        public ResolvedExpression visit(CallExpression call) {
+            List<ResolvedExpression> scopedChildren =
+                    call.getChildren().stream()
+                            .map(c -> c.accept(this))
+                            .collect(Collectors.toList());
+            return call.replaceArgs(scopedChildren, call.getOutputDataType());
+        }
+
+        @Override
+        public ResolvedExpression visit(FieldReferenceExpression 
fieldReference) {
+            return new FieldReferenceExpression(
+                    fieldReference.getName(),
+                    fieldReference.getOutputDataType(),
+                    fieldReference.getInputIndex(),
+                    fieldReference.getFieldIndex(),
+                    inputAliases.get(fieldReference.getInputIndex()));
+        }
+
+        @Override
+        protected ResolvedExpression defaultMethod(ResolvedExpression 
expression) {
+            return expression;
+        }
+    }
+
     private OperationExpressionsUtils() {}
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
index 42f19ca3d66..ed835aac7d2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -34,6 +36,7 @@ import java.util.Objects;
  *   <li>type
  *   <li>index of an input the field belongs to
  *   <li>index of a field within the corresponding input
+ *   <li>optional: alias of the input, if it needs to be referenced by name
  * </ul>
  */
 @PublicEvolving
@@ -52,14 +55,26 @@ public final class FieldReferenceExpression implements 
ResolvedExpression {
     /** index of a field within the corresponding input. */
     private final int fieldIndex;
 
+    private final @Nullable String inputAlias;
+
     public FieldReferenceExpression(
             String name, DataType dataType, int inputIndex, int fieldIndex) {
+        this(name, dataType, inputIndex, fieldIndex, null);
+    }
+
+    public FieldReferenceExpression(
+            String name,
+            DataType dataType,
+            int inputIndex,
+            int fieldIndex,
+            @Nullable String inputAlias) {
         Preconditions.checkArgument(inputIndex >= 0, "Index of input should be 
a positive number");
         Preconditions.checkArgument(fieldIndex >= 0, "Index of field should be 
a positive number");
         this.name = Preconditions.checkNotNull(name, "Field name must not be 
null.");
         this.dataType = Preconditions.checkNotNull(dataType, "Field data type 
must not be null.");
         this.inputIndex = inputIndex;
         this.fieldIndex = fieldIndex;
+        this.inputAlias = inputAlias;
     }
 
     public String getName() {
@@ -91,6 +106,12 @@ public final class FieldReferenceExpression implements 
ResolvedExpression {
 
     @Override
     public String asSerializableString() {
+        if (inputAlias != null) {
+            return String.format(
+                    "%s.%s",
+                    EncodingUtils.escapeIdentifier(inputAlias),
+                    EncodingUtils.escapeIdentifier(name));
+        }
         return EncodingUtils.escapeIdentifier(name);
     }
 
@@ -116,12 +137,13 @@ public final class FieldReferenceExpression implements 
ResolvedExpression {
         return name.equals(that.name)
                 && dataType.equals(that.dataType)
                 && inputIndex == that.inputIndex
-                && fieldIndex == that.fieldIndex;
+                && fieldIndex == that.fieldIndex
+                && Objects.equals(inputAlias, that.inputAlias);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, dataType, inputIndex, fieldIndex);
+        return Objects.hash(name, dataType, inputIndex, fieldIndex, 
inputAlias);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
index 04526d7b29c..0fac184fb42 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
@@ -74,7 +74,8 @@ public class QueryOperationSqlExecutionTest implements 
TableTestProgramRunner {
                 QueryOperationTestPrograms.SQL_QUERY_OPERATION,
                 QueryOperationTestPrograms.OVER_WINDOW_RANGE,
                 QueryOperationTestPrograms.OVER_WINDOW_ROWS,
-                
QueryOperationTestPrograms.OVER_WINDOW_ROWS_UNBOUNDED_NO_PARTITION);
+                
QueryOperationTestPrograms.OVER_WINDOW_ROWS_UNBOUNDED_NO_PARTITION,
+                QueryOperationTestPrograms.ACCESSING_NESTED_COLUMN);
     }
 
     @ParameterizedTest
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
index 285989247b6..a006b1a46c4 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
@@ -63,7 +63,8 @@ public class QueryOperationSqlSerializationTest implements 
TableTestProgramRunne
                 QueryOperationTestPrograms.SQL_QUERY_OPERATION,
                 QueryOperationTestPrograms.OVER_WINDOW_RANGE,
                 QueryOperationTestPrograms.OVER_WINDOW_ROWS,
-                
QueryOperationTestPrograms.OVER_WINDOW_ROWS_UNBOUNDED_NO_PARTITION);
+                
QueryOperationTestPrograms.OVER_WINDOW_ROWS_UNBOUNDED_NO_PARTITION,
+                QueryOperationTestPrograms.ACCESSING_NESTED_COLUMN);
     }
 
     @ParameterizedTest
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
index 23c2bd30e82..a53d3d3eda2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
@@ -59,7 +59,10 @@ public class QueryOperationTestPrograms {
                                     .consumedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
                                     .build())
                     .runTableApi(t -> t.from("s"), "sink")
-                    .runSql("SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`")
+                    .runSql(
+                            "SELECT `$$T_SOURCE`.`a`, `$$T_SOURCE`.`b` FROM 
`default_catalog`"
+                                    + ".`default_database`.`s` "
+                                    + "$$T_SOURCE")
                     .build();
 
     static final TableTestProgram VALUES_QUERY_OPERATION =
@@ -71,10 +74,10 @@ public class QueryOperationTestPrograms {
                                     .build())
                     .runTableApi(t -> t.fromValues(row(1L, "abc"), row(2L, 
"cde")), "sink")
                     .runSql(
-                            "SELECT `f0`, `f1` FROM (VALUES \n"
+                            "SELECT `$$T_VAL`.`f0`, `$$T_VAL`.`f1` FROM 
(VALUES \n"
                                     + "    (CAST(1 AS BIGINT), 'abc'),\n"
                                     + "    (CAST(2 AS BIGINT), 'cde')\n"
-                                    + ") VAL$0(`f0`, `f1`)")
+                                    + ") $$T_VAL(`f0`, `f1`)")
                     .build();
 
     static final TableTestProgram FILTER_QUERY_OPERATION =
@@ -91,9 +94,10 @@ public class QueryOperationTestPrograms {
                                     .build())
                     .runTableApi(t -> 
t.from("s").where($("a").isGreaterOrEqual(15)), "sink")
                     .runSql(
-                            "SELECT `a`, `b` FROM (\n"
-                                    + "    SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
-                                    + ") WHERE `a` >= 15")
+                            "SELECT `$$T_FILTER`.`a`, `$$T_FILTER`.`b` FROM 
(\n"
+                                    + "    SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b` FROM `default_catalog`"
+                                    + ".`default_database`.`s` $$T_SOURCE\n"
+                                    + ") $$T_FILTER WHERE `$$T_FILTER`.`a` >= 
15")
                     .build();
 
     static final TableTestProgram DISTINCT_QUERY_OPERATION =
@@ -114,11 +118,12 @@ public class QueryOperationTestPrograms {
                     .runTableApi(
                             t -> 
t.from("s").where($("a").isGreaterOrEqual(15)).distinct(), "sink")
                     .runSql(
-                            "SELECT DISTINCT `a`, `b` FROM (\n"
-                                    + "    SELECT `a`, `b` FROM (\n"
-                                    + "        SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
-                                    + "    ) WHERE `a` >= 15\n"
-                                    + ")")
+                            "SELECT DISTINCT `$$T_DISTINCT`.`a`, 
`$$T_DISTINCT`.`b` FROM (\n"
+                                    + "    SELECT `$$T_FILTER`.`a`, 
`$$T_FILTER`.`b` FROM (\n"
+                                    + "        SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b` FROM `default_catalog`"
+                                    + ".`default_database`.`s` $$T_SOURCE\n"
+                                    + "    ) $$T_FILTER WHERE `$$T_FILTER`.`a` 
>= 15\n"
+                                    + ") $$T_DISTINCT")
                     .build();
 
     static final TableTestProgram AGGREGATE_QUERY_OPERATION =
@@ -140,12 +145,14 @@ public class QueryOperationTestPrograms {
                     .runTableApi(
                             t -> t.from("s").groupBy($("b")).select($("b"), 
$("a").sum()), "sink")
                     .runSql(
-                            "SELECT `b`, `EXPR$0` FROM (\n"
-                                    + "    SELECT `b`, (SUM(`a`)) AS `EXPR$0` 
FROM (\n"
-                                    + "        SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
-                                    + "    )\n"
-                                    + "    GROUP BY `b`\n"
-                                    + ")")
+                            "SELECT `$$T_PROJECT`.`b`, `$$T_PROJECT`.`EXPR$0` 
FROM (\n"
+                                    + "    SELECT `$$T_AGG`.`b`, 
(SUM(`$$T_AGG`.`a`)) AS `EXPR$0`"
+                                    + " FROM (\n"
+                                    + "        SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b` FROM "
+                                    + 
"`default_catalog`.`default_database`.`s` $$T_SOURCE\n"
+                                    + "    ) $$T_AGG\n"
+                                    + "    GROUP BY `$$T_AGG`.`b`\n"
+                                    + ") $$T_PROJECT")
                     .build();
 
     static final TableTestProgram AGGREGATE_NO_GROUP_BY_QUERY_OPERATION =
@@ -167,12 +174,13 @@ public class QueryOperationTestPrograms {
                                     .build())
                     .runTableApi(t -> t.from("s").select($("a").sum()), "sink")
                     .runSql(
-                            "SELECT `EXPR$0` FROM (\n"
-                                    + "    SELECT (SUM(`a`)) AS `EXPR$0` FROM 
(\n"
-                                    + "        SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
-                                    + "    )\n"
+                            "SELECT `$$T_PROJECT`.`EXPR$0` FROM (\n"
+                                    + "    SELECT (SUM(`$$T_AGG`.`a`)) AS 
`EXPR$0` FROM (\n"
+                                    + "        SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b` FROM "
+                                    + 
"`default_catalog`.`default_database`.`s` $$T_SOURCE\n"
+                                    + "    ) $$T_AGG\n"
                                     + "    GROUP BY 1\n"
-                                    + ")")
+                                    + ") $$T_PROJECT")
                     .build();
 
     static final TableTestProgram WINDOW_AGGREGATE_QUERY_OPERATION =
@@ -207,13 +215,14 @@ public class QueryOperationTestPrograms {
                                             .select($("b"), $("w").start(), 
$("a").sum()),
                             "sink")
                     .runSql(
-                            "SELECT `b`, `EXPR$0`, `EXPR$1` FROM (\n"
-                                    + "    SELECT `b`, (SUM(`a`)) AS `EXPR$1`, 
(window_start) AS `EXPR$0` FROM TABLE(\n"
+                            "SELECT `$$T_PROJECT`.`b`, `$$T_PROJECT`.`EXPR$0`, 
`$$T_PROJECT`.`EXPR$1` FROM (\n"
+                                    + "    SELECT `$$T_WIN_AGG`.`b`, 
(SUM(`$$T_WIN_AGG`.`a`)) AS `EXPR$1`, (window_start) AS `EXPR$0` FROM TABLE(\n"
                                     + "        TUMBLE((\n"
-                                    + "            SELECT `a`, `b`, `ts` FROM 
`default_catalog`.`default_database`.`s`\n"
+                                    + "            SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b`, "
+                                    + "`$$T_SOURCE`.`ts` FROM 
`default_catalog`.`default_database`.`s` $$T_SOURCE\n"
                                     + "        ), DESCRIPTOR(`ts`), INTERVAL 
'0 00:00:05.0' DAY TO SECOND(3))\n"
-                                    + "    ) GROUP BY window_start, 
window_end, `b`\n"
-                                    + ")")
+                                    + "    ) $$T_WIN_AGG GROUP BY 
window_start, window_end, `$$T_WIN_AGG`.`b`\n"
+                                    + ") $$T_PROJECT")
                     .build();
 
     private static Instant dayOfSeconds(int second) {
@@ -259,13 +268,22 @@ public class QueryOperationTestPrograms {
                                             .select($("name"), $("d_name"), 
$("age")),
                             "sink")
                     .runSql(
-                            "SELECT `name`, `d_name`, `age` FROM (\n"
-                                    + "    SELECT `emp_id`, `e_dept_id`, 
`name`, `age`, `dept_id`, `d_name` FROM (\n"
-                                    + "        SELECT `emp_id`, `e_dept_id`, 
`name`, `age` FROM `default_catalog`.`default_database`.`e`\n"
-                                    + "    ) INNER JOIN (\n"
-                                    + "        SELECT `dept_id`, `d_name` FROM 
`default_catalog`.`default_database`.`d`\n"
-                                    + "    ) ON (`e_dept_id` = `dept_id`) AND 
(`age` >= 21)\n"
-                                    + ")")
+                            "SELECT `$$T_PROJECT`.`name`, 
`$$T_PROJECT`.`d_name`, `$$T_PROJECT`.`age` FROM (\n"
+                                    + "    SELECT `$$T1_JOIN`.`emp_id`, 
`$$T1_JOIN`.`e_dept_id`, "
+                                    + "`$$T1_JOIN`.`name`, "
+                                    + "`$$T1_JOIN`.`age`, 
`$$T2_JOIN`.`dept_id`, `$$T2_JOIN`"
+                                    + ".`d_name` FROM (\n"
+                                    + "        SELECT `$$T_SOURCE`.`emp_id`, 
`$$T_SOURCE`"
+                                    + ".`e_dept_id`, `$$T_SOURCE`.`name`, 
`$$T_SOURCE`.`age` FROM `default_catalog`.`default_database`.`e` $$T_SOURCE\n"
+                                    + "    ) $$T1_JOIN INNER JOIN (\n"
+                                    + "        SELECT `$$T_SOURCE`.`dept_id`, 
`$$T_SOURCE`"
+                                    + ".`d_name` FROM 
`default_catalog`.`default_database`.`d` $$T_SOURCE\n"
+                                    + "    ) $$T2_JOIN ON 
(`$$T1_JOIN`.`e_dept_id` = `$$T2_JOIN`"
+                                    + ".`dept_id`)"
+                                    + " AND "
+                                    + "(`$$T1_JOIN`.`age` "
+                                    + ">= 21)\n"
+                                    + ") $$T_PROJECT")
                     .build();
 
     static final TableTestProgram LATERAL_JOIN_QUERY_OPERATION =
@@ -285,10 +303,11 @@ public class QueryOperationTestPrograms {
                     .runTableApi(
                             t -> t.from("e").joinLateral(call("udtf", 
$("b")).as("f0")), "sink")
                     .runSql(
-                            "SELECT `a`, `b`, `f0` FROM (\n"
-                                    + "    SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`e`\n"
-                                    + ") INNER JOIN \n"
-                                    + "    LATERAL 
TABLE(`default_catalog`.`default_database`.`udtf`(`b`)) T$0(`f0`) ON TRUE")
+                            "SELECT `$$T1_JOIN`.`a`, `$$T1_JOIN`.`b`, 
`$$T_LAT`.`f0` FROM (\n"
+                                    + "    SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b` FROM "
+                                    + 
"`default_catalog`.`default_database`.`e` $$T_SOURCE\n"
+                                    + ") $$T1_JOIN INNER JOIN \n"
+                                    + "    LATERAL 
TABLE(`default_catalog`.`default_database`.`udtf`(`b`)) $$T_LAT(`f0`) ON TRUE")
                     .build();
 
     static final TableTestProgram UNION_ALL_QUERY_OPERATION =
@@ -311,9 +330,11 @@ public class QueryOperationTestPrograms {
                     .runTableApi(t -> t.from("s").unionAll(t.from("t")), 
"sink")
                     .runSql(
                             "SELECT `a`, `b` FROM (\n"
-                                    + "    SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
+                                    + "    SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b` FROM "
+                                    + 
"`default_catalog`.`default_database`.`s` $$T_SOURCE\n"
                                     + ") UNION ALL (\n"
-                                    + "    SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`t`\n"
+                                    + "    SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b` FROM "
+                                    + 
"`default_catalog`.`default_database`.`t` $$T_SOURCE\n"
                                     + ")")
                     .build();
 
@@ -338,9 +359,11 @@ public class QueryOperationTestPrograms {
                             t -> t.from("s").orderBy($("a"), 
$("b").desc()).offset(1).fetch(2),
                             "sink")
                     .runSql(
-                            "SELECT `a`, `b` FROM (\n"
-                                    + "    SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
-                                    + ") ORDER BY `a` ASC, `b` DESC OFFSET 1 
ROWS FETCH NEXT 2 ROWS ONLY")
+                            "SELECT `$$T_SORT`.`a`, `$$T_SORT`.`b` FROM (\n"
+                                    + "    SELECT `$$T_SOURCE`.`a`, 
`$$T_SOURCE`.`b` FROM `default_catalog`"
+                                    + ".`default_database`.`s` $$T_SOURCE\n"
+                                    + ") $$T_SORT ORDER BY `$$T_SORT`.`a` ASC, 
`$$T_SORT`.`b` DESC"
+                                    + " OFFSET 1 ROWS FETCH NEXT 2 ROWS ONLY")
                     .build();
 
     static final TableTestProgram SQL_QUERY_OPERATION =
@@ -361,10 +384,12 @@ public class QueryOperationTestPrograms {
                                             .select($("a").plus(2), 
$("b").substr(2, 3)),
                             "sink")
                     .runSql(
-                            "SELECT (`a` + 2) AS `_c0`, (SUBSTR(`b`, 2, 3)) AS 
`_c1` FROM (\n"
+                            "SELECT (`$$T_PROJECT`.`a` + 2) AS `_c0`, 
(SUBSTR(`$$T_PROJECT`.`b`, "
+                                    + "2, 3)) AS "
+                                    + "`_c1` FROM (\n"
                                     + "    SELECT `s`.`a`, `s`.`b`\n"
                                     + "    FROM 
`default_catalog`.`default_database`.`s` AS `s`\n"
-                                    + ")")
+                                    + ") $$T_PROJECT")
                     .build();
 
     static final TableTestProgram GROUP_HOP_WINDOW_EVENT_TIME =
@@ -765,9 +790,13 @@ public class QueryOperationTestPrograms {
                                     .consumedAfterRestore(Row.of("Apple", 3L, 
dayOfSeconds(2)))
                                     .build())
                     .runSql(
-                            "SELECT `k`, (LAST_VALUE(`v`) OVER(PARTITION BY 
`k` ORDER BY `ts` RANGE BETWEEN INTERVAL '0 00:00:02.0' DAY TO SECOND(3) 
PRECEDING AND CURRENT ROW)) AS `_c1`, `ts` FROM (\n"
-                                    + "    SELECT `k`, `v`, `ts` FROM 
`default_catalog`.`default_database`.`data`\n"
-                                    + ")")
+                            "SELECT `$$T_PROJECT`.`k`, 
(LAST_VALUE(`$$T_PROJECT`.`v`) "
+                                    + "OVER(PARTITION BY `$$T_PROJECT`.`k` "
+                                    + "ORDER BY `$$T_PROJECT`.`ts` RANGE 
BETWEEN INTERVAL '0 "
+                                    + "00:00:02.0' DAY TO SECOND(3) PRECEDING 
AND CURRENT ROW)) AS `_c1`, `$$T_PROJECT`.`ts` FROM (\n"
+                                    + "    SELECT `$$T_SOURCE`.`k`, 
`$$T_SOURCE`.`v`, "
+                                    + "`$$T_SOURCE`.`ts` FROM 
`default_catalog`.`default_database`.`data` $$T_SOURCE\n"
+                                    + ") $$T_PROJECT")
                     .runTableApi(
                             tableEnvAccessor ->
                                     tableEnvAccessor
@@ -807,10 +836,14 @@ public class QueryOperationTestPrograms {
                                     .consumedAfterRestore(Row.of("Apple", 3L, 
dayOfSeconds(2)))
                                     .build())
                     .runSql(
-                            "SELECT `k`, (LAST_VALUE(`v`) OVER(PARTITION BY 
`k` ORDER BY `ts` "
-                                    + "ROWS BETWEEN CAST(2 AS BIGINT) 
PRECEDING AND CURRENT ROW)) AS `_c1`, `ts` FROM (\n"
-                                    + "    SELECT `k`, `v`, `ts` FROM 
`default_catalog`.`default_database`.`data`\n"
-                                    + ")")
+                            "SELECT `$$T_PROJECT`.`k`, 
(LAST_VALUE(`$$T_PROJECT`.`v`) OVER"
+                                    + "(PARTITION BY `$$T_PROJECT`.`k` "
+                                    + "ORDER BY `$$T_PROJECT`.`ts` "
+                                    + "ROWS BETWEEN CAST(2 AS BIGINT) 
PRECEDING AND CURRENT ROW))"
+                                    + " AS `_c1`, `$$T_PROJECT`.`ts` FROM (\n"
+                                    + "    SELECT `$$T_SOURCE`.`k`, 
`$$T_SOURCE`.`v`, "
+                                    + "`$$T_SOURCE`.`ts` FROM 
`default_catalog`.`default_database`.`data` $$T_SOURCE\n"
+                                    + ") $$T_PROJECT")
                     .runTableApi(
                             tableEnvAccessor ->
                                     tableEnvAccessor
@@ -852,11 +885,13 @@ public class QueryOperationTestPrograms {
                                     .consumedAfterRestore(Row.of(3L, 
dayOfSeconds(2)))
                                     .build())
                     .runSql(
-                            "SELECT (LAST_VALUE(`v`) OVER(ORDER BY `ts` "
+                            "SELECT (LAST_VALUE(`$$T_PROJECT`.`v`) OVER(ORDER 
BY `$$T_PROJECT`"
+                                    + ".`ts` "
                                     + "ROWS BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW)) AS "
-                                    + "`_c0`, `ts` FROM (\n"
-                                    + "    SELECT `k`, `v`, `ts` FROM 
`default_catalog`.`default_database`.`data`\n"
-                                    + ")")
+                                    + "`_c0`, `$$T_PROJECT`.`ts` FROM (\n"
+                                    + "    SELECT `$$T_SOURCE`.`k`, 
`$$T_SOURCE`.`v`, "
+                                    + "`$$T_SOURCE`.`ts` FROM 
`default_catalog`.`default_database`.`data` $$T_SOURCE\n"
+                                    + ") $$T_PROJECT")
                     .runTableApi(
                             tableEnvAccessor ->
                                     tableEnvAccessor
@@ -868,4 +903,47 @@ public class QueryOperationTestPrograms {
                                             
.select($("v").lastValue().over($("w")), $("ts")),
                             "sink")
                     .build();
+
+    static final TableTestProgram ACCESSING_NESTED_COLUMN =
+            TableTestProgram.of(
+                            "project-nested-columnd",
+                            "test projection with nested columns of an inline 
type")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("data")
+                                    .addSchema("f0 bigint")
+                                    .producedBeforeRestore(Row.of(1L), 
Row.of(2L))
+                                    .producedAfterRestore(Row.of(3L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("v bigint")
+                                    .consumedBeforeRestore(Row.of(1L), 
Row.of(2L))
+                                    .consumedAfterRestore(Row.of(3L))
+                                    .build())
+                    .runSql(
+                            "SELECT 
(`$$T_PROJECT`.`composite_column`.`f0_nested`) AS `composite_column$f0_nested` 
FROM (\n"
+                                    + "    SELECT 
(CAST(ROW(`$$T_PROJECT`.`f0`, 'a') AS ROW<`f0_nested` BIGINT, `f1_nested` 
VARCHAR(2147483647)>)) AS `composite_column` FROM (\n"
+                                    + "        SELECT `$$T_SOURCE`.`f0` FROM 
`default_catalog`.`default_database`.`data` $$T_SOURCE\n"
+                                    + "    ) $$T_PROJECT\n"
+                                    + ") $$T_PROJECT")
+                    .runTableApi(
+                            tableEnvAccessor ->
+                                    tableEnvAccessor
+                                            .from("data")
+                                            .select(
+                                                    row($("f0"), lit("a"))
+                                                            .cast(
+                                                                    
DataTypes.ROW(
+                                                                            
DataTypes.FIELD(
+                                                                               
     "f0_nested",
+                                                                               
     DataTypes
+                                                                               
             .BIGINT()),
+                                                                            
DataTypes.FIELD(
+                                                                               
     "f1_nested",
+                                                                               
     DataTypes
+                                                                               
             .STRING())))
+                                                            
.as("composite_column"))
+                                            
.select($("composite_column").get("f0_nested")),
+                            "sink")
+                    .build();
 }

Reply via email to