This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 03c8185090e [FLINK-36639] Use table aliases in Table API
asSerializableString (#25597)
03c8185090e is described below
commit 03c8185090ef7f91b525315ff828afdd97925c0d
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Mon Nov 4 09:11:01 2024 +0100
[FLINK-36639] Use table aliases in Table API asSerializableString (#25597)
---
.../table/operations/AggregateQueryOperation.java | 21 ++-
.../table/operations/CalculatedQueryOperation.java | 7 +-
.../table/operations/DistinctQueryOperation.java | 8 +-
.../table/operations/FilterQueryOperation.java | 10 +-
.../flink/table/operations/JoinQueryOperation.java | 29 +++-
.../flink/table/operations/OperationUtils.java | 15 +-
.../table/operations/ProjectQueryOperation.java | 7 +-
.../flink/table/operations/SetQueryOperation.java | 2 +-
.../flink/table/operations/SortQueryOperation.java | 13 +-
.../table/operations/SourceQueryOperation.java | 12 +-
.../table/operations/ValuesQueryOperation.java | 9 +-
.../operations/WindowAggregateQueryOperation.java | 14 +-
.../utils/OperationExpressionsUtils.java | 54 ++++++
.../expressions/FieldReferenceExpression.java | 26 ++-
.../table/api/QueryOperationSqlExecutionTest.java | 3 +-
.../api/QueryOperationSqlSerializationTest.java | 3 +-
.../table/api/QueryOperationTestPrograms.java | 190 +++++++++++++++------
17 files changed, 328 insertions(+), 95 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
index c1e6d3a5479..9dcdaee104d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -35,6 +36,7 @@ import java.util.stream.Stream;
@Internal
public class AggregateQueryOperation implements QueryOperation {
+ private static final String INPUT_ALIAS = "$$T_AGG";
private final List<ResolvedExpression> groupingExpressions;
private final List<ResolvedExpression> aggregateExpressions;
private final QueryOperation child;
@@ -78,11 +80,16 @@ public class AggregateQueryOperation implements
QueryOperation {
public String asSerializableString() {
final String groupingExprs = getGroupingExprs();
return String.format(
- "SELECT %s FROM (%s\n)\nGROUP BY %s",
+ "SELECT %s FROM (%s\n) %s\nGROUP BY %s",
Stream.concat(groupingExpressions.stream(),
aggregateExpressions.stream())
+ .map(
+ expr ->
+
OperationExpressionsUtils.scopeReferencesWithAlias(
+ INPUT_ALIAS, expr))
.map(ResolvedExpression::asSerializableString)
.collect(Collectors.joining(", ")),
OperationUtils.indent(child.asSerializableString()),
+ INPUT_ALIAS,
groupingExprs);
}
@@ -90,11 +97,13 @@ public class AggregateQueryOperation implements
QueryOperation {
if (groupingExpressions.isEmpty()) {
return "1";
} else {
- final String groupingExprs =
- groupingExpressions.stream()
- .map(ResolvedExpression::asSerializableString)
- .collect(Collectors.joining(", "));
- return groupingExprs;
+ return groupingExpressions.stream()
+ .map(
+ expr ->
+
OperationExpressionsUtils.scopeReferencesWithAlias(
+ INPUT_ALIAS, expr))
+ .map(ResolvedExpression::asSerializableString)
+ .collect(Collectors.joining(", "));
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
index f4bc5ee4bcc..8b264fa3b61 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
@@ -33,6 +33,8 @@ import java.util.Map;
@Internal
public class CalculatedQueryOperation implements QueryOperation {
+ public static final String INPUT_ALIAS = "$$T_LAT";
+
private final ContextResolvedFunction resolvedFunction;
private final List<ResolvedExpression> arguments;
private final ResolvedSchema resolvedSchema;
@@ -74,11 +76,12 @@ public class CalculatedQueryOperation implements
QueryOperation {
// if we ever add multi-way join in JoinQueryOperation we need to sort
out uniqueness of the
// table name
return String.format(
- "LATERAL TABLE(%s) T$0(%s)",
+ "LATERAL TABLE(%s) %s(%s)",
resolvedFunction
.toCallExpression(arguments,
resolvedSchema.toPhysicalRowDataType())
.asSerializableString(),
- OperationUtils.formatSelectColumns(resolvedSchema));
+ INPUT_ALIAS,
+ OperationUtils.formatSelectColumns(resolvedSchema, null));
}
@Override
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java
index 96f42d5016f..69995d280a8 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java
@@ -28,6 +28,7 @@ import java.util.List;
@Internal
public class DistinctQueryOperation implements QueryOperation {
+ private static final String INPUT_ALIAS = "$$T_DISTINCT";
private final QueryOperation child;
public DistinctQueryOperation(QueryOperation child) {
@@ -48,9 +49,10 @@ public class DistinctQueryOperation implements
QueryOperation {
@Override
public String asSerializableString() {
return String.format(
- "SELECT DISTINCT %s FROM (%s\n)",
- OperationUtils.formatSelectColumns(getResolvedSchema()),
- OperationUtils.indent(child.asSerializableString()));
+ "SELECT DISTINCT %s FROM (%s\n) %s",
+ OperationUtils.formatSelectColumns(getResolvedSchema(),
INPUT_ALIAS),
+ OperationUtils.indent(child.asSerializableString()),
+ INPUT_ALIAS);
}
@Override
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/FilterQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/FilterQueryOperation.java
index 97ffb91bbc0..14639b91295 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/FilterQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/FilterQueryOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@ import java.util.Map;
@Internal
public class FilterQueryOperation implements QueryOperation {
+ private static final String INPUT_ALIAS = "$$T_FILTER";
private final ResolvedExpression condition;
private final QueryOperation child;
@@ -60,10 +62,12 @@ public class FilterQueryOperation implements QueryOperation
{
@Override
public String asSerializableString() {
return String.format(
- "SELECT %s FROM (%s\n) WHERE %s",
- OperationUtils.formatSelectColumns(getResolvedSchema()),
+ "SELECT %s FROM (%s\n) %s WHERE %s",
+ OperationUtils.formatSelectColumns(getResolvedSchema(),
INPUT_ALIAS),
OperationUtils.indent(child.asSerializableString()),
- condition.asSerializableString());
+ INPUT_ALIAS,
+
OperationExpressionsUtils.scopeReferencesWithAlias(INPUT_ALIAS, condition)
+ .asSerializableString());
}
@Override
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java
index bd139ca3d10..7c785a29009 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java
@@ -21,8 +21,10 @@ package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +35,8 @@ import java.util.stream.Stream;
@Internal
public class JoinQueryOperation implements QueryOperation {
+ private static final String INPUT_1_ALIAS = "$$T1_JOIN";
+ private static final String INPUT_2_ALIAS = "$$T2_JOIN";
private final QueryOperation left;
private final QueryOperation right;
private final JoinType joinType;
@@ -108,13 +112,30 @@ public class JoinQueryOperation implements QueryOperation
{
@Override
public String asSerializableString() {
+
+ Map<Integer, String> inputAliases = new HashMap<>();
+ inputAliases.put(0, INPUT_1_ALIAS);
+ inputAliases.put(1, correlated ? CalculatedQueryOperation.INPUT_ALIAS
: INPUT_2_ALIAS);
+
return String.format(
- "SELECT %s FROM (%s\n) %s JOIN %s ON %s",
- OperationUtils.formatSelectColumns(resolvedSchema),
+ "SELECT %s FROM (%s\n) %s %s JOIN %s ON %s",
+ getSelectList(),
OperationUtils.indent(left.asSerializableString()),
+ INPUT_1_ALIAS,
joinType.toString().replaceAll("_", " "),
rightToSerializable(),
- condition.asSerializableString());
+
OperationExpressionsUtils.scopeReferencesWithAlias(inputAliases, condition)
+ .asSerializableString());
+ }
+
+ private String getSelectList() {
+ String leftColumns =
+ OperationUtils.formatSelectColumns(left.getResolvedSchema(),
INPUT_1_ALIAS);
+ String rightColumns =
+ OperationUtils.formatSelectColumns(
+ right.getResolvedSchema(),
+ correlated ? CalculatedQueryOperation.INPUT_ALIAS :
INPUT_2_ALIAS);
+ return leftColumns + ", " + rightColumns;
}
private String rightToSerializable() {
@@ -125,6 +146,8 @@ public class JoinQueryOperation implements QueryOperation {
s.append(OperationUtils.indent(right.asSerializableString()));
if (!correlated) {
s.append("\n)");
+ s.append(" ");
+ s.append(INPUT_2_ALIAS);
}
return s.toString();
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
index ab9a8675f88..2455e4a344c 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.util.StringUtils;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -118,9 +120,18 @@ public class OperationUtils {
return stringBuilder.append(childrenDescription).toString();
}
- public static String formatSelectColumns(ResolvedSchema schema) {
+ public static String formatSelectColumns(ResolvedSchema schema, @Nullable
String inputAlias) {
return schema.getColumnNames().stream()
- .map(EncodingUtils::escapeIdentifier)
+ .map(
+ i -> {
+ if (inputAlias == null) {
+ return EncodingUtils.escapeIdentifier(i);
+ }
+ return String.format(
+ "%s.%s",
+ EncodingUtils.escapeIdentifier(inputAlias),
+ EncodingUtils.escapeIdentifier(i));
+ })
.collect(Collectors.joining(", "));
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ProjectQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ProjectQueryOperation.java
index f5359043fb9..b2ebae350c7 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ProjectQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ProjectQueryOperation.java
@@ -42,6 +42,7 @@ import java.util.stream.IntStream;
@Internal
public class ProjectQueryOperation implements QueryOperation {
+ private static final String INPUT_ALIAS = "$$T_PROJECT";
private final List<ResolvedExpression> projectList;
private final QueryOperation child;
private final ResolvedSchema resolvedSchema;
@@ -76,9 +77,13 @@ public class ProjectQueryOperation implements QueryOperation
{
@Override
public String asSerializableString() {
return String.format(
- "SELECT %s FROM (%s\n)",
+ "SELECT %s FROM (%s\n) " + INPUT_ALIAS,
IntStream.range(0, projectList.size())
.mapToObj(this::alias)
+ .map(
+ expr ->
+
OperationExpressionsUtils.scopeReferencesWithAlias(
+ INPUT_ALIAS, expr))
.map(ResolvedExpression::asSerializableString)
.collect(Collectors.joining(", ")),
OperationUtils.indent(child.asSerializableString()));
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SetQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SetQueryOperation.java
index c43cc7ca1ef..dbbae5c0398 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SetQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SetQueryOperation.java
@@ -88,7 +88,7 @@ public class SetQueryOperation implements QueryOperation {
public String asSerializableString() {
return String.format(
"SELECT %s FROM (%s\n) %s (%s\n)",
- OperationUtils.formatSelectColumns(resolvedSchema),
+ OperationUtils.formatSelectColumns(resolvedSchema, null),
OperationUtils.indent(leftOperation.asSerializableString()),
asSerializableType(),
OperationUtils.indent(rightOperation.asSerializableString()));
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
index d447bdf58ac..3575f90350b 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -35,6 +36,7 @@ import java.util.stream.Collectors;
@Internal
public class SortQueryOperation implements QueryOperation {
+ private static final String INPUT_ALIAS = "$$T_SORT";
private final List<ResolvedExpression> order;
private final QueryOperation child;
private final int offset;
@@ -89,10 +91,17 @@ public class SortQueryOperation implements QueryOperation {
final StringBuilder s =
new StringBuilder(
String.format(
- "SELECT %s FROM (%s\n) ORDER BY %s",
-
OperationUtils.formatSelectColumns(getResolvedSchema()),
+ "SELECT %s FROM (%s\n) %s ORDER BY %s",
+ OperationUtils.formatSelectColumns(
+ getResolvedSchema(), INPUT_ALIAS),
OperationUtils.indent(child.asSerializableString()),
+ INPUT_ALIAS,
order.stream()
+ .map(
+ expr ->
+
OperationExpressionsUtils
+
.scopeReferencesWithAlias(
+
INPUT_ALIAS, expr))
.map(ResolvedExpression::asSerializableString)
.collect(Collectors.joining(", "))));
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
index 926c79c52e3..203e5b47109 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.utils.EncodingUtils;
import javax.annotation.Nullable;
@@ -31,7 +30,6 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/**
* Describes a query operation from a {@link ContextResolvedTable}.
@@ -43,6 +41,7 @@ import java.util.stream.Collectors;
@Internal
public class SourceQueryOperation implements QueryOperation {
+ private static final String INPUT_ALIAS = "$$T_SOURCE";
private final ContextResolvedTable contextResolvedTable;
private final @Nullable Map<String, String> dynamicOptions;
@@ -83,11 +82,10 @@ public class SourceQueryOperation implements QueryOperation
{
public String asSerializableString() {
String s =
String.format(
- "SELECT %s FROM %s",
- getResolvedSchema().getColumnNames().stream()
- .map(EncodingUtils::escapeIdentifier)
- .collect(Collectors.joining(", ")),
-
getContextResolvedTable().getIdentifier().asSerializableString());
+ "SELECT %s FROM %s %s",
+
OperationUtils.formatSelectColumns(getResolvedSchema(), INPUT_ALIAS),
+
getContextResolvedTable().getIdentifier().asSerializableString(),
+ INPUT_ALIAS);
if (dynamicOptions != null && !dynamicOptions.isEmpty()) {
throw new TableException("Dynamic source options are not SQL
serializable yet.");
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java
index 3400529296f..39849cd77f5 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java
@@ -37,6 +37,7 @@ import java.util.stream.Collectors;
@Internal
public class ValuesQueryOperation implements QueryOperation {
+ private static final String INPUT_ALIAS = "$$T_VAL";
private final List<List<ResolvedExpression>> values;
private final ResolvedSchema resolvedSchema;
@@ -66,10 +67,9 @@ public class ValuesQueryOperation implements QueryOperation {
@Override
public String asSerializableString() {
- final String selectColumns =
OperationUtils.formatSelectColumns(resolvedSchema);
return String.format(
- "SELECT %s FROM (VALUES %s\n) VAL$0(%s)",
- selectColumns,
+ "SELECT %s FROM (VALUES %s\n) %s(%s)",
+ OperationUtils.formatSelectColumns(resolvedSchema,
INPUT_ALIAS),
OperationUtils.indent(
values.stream()
.map(
@@ -81,7 +81,8 @@ public class ValuesQueryOperation implements QueryOperation {
.collect(
Collectors.joining(", ", "(", ")")))
.collect(Collectors.joining(",\n"))),
- selectColumns);
+ INPUT_ALIAS,
+ OperationUtils.formatSelectColumns(resolvedSchema, null));
}
@Override
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java
index 8eea2d4b9cb..0b13fb6ddcb 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
@@ -50,6 +51,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class WindowAggregateQueryOperation implements QueryOperation {
+ private static final String INPUT_ALIAS = "$$T_WIN_AGG";
private final List<ResolvedExpression> groupingExpressions;
private final List<ResolvedExpression> aggregateExpressions;
private final List<ResolvedExpression> windowPropertiesExpressions;
@@ -92,19 +94,29 @@ public class WindowAggregateQueryOperation implements
QueryOperation {
@Override
public String asSerializableString() {
return String.format(
- "SELECT %s FROM TABLE(%s\n) GROUP BY %s",
+ "SELECT %s FROM TABLE(%s\n) %s GROUP BY %s",
Stream.of(
groupingExpressions.stream(),
aggregateExpressions.stream(),
windowPropertiesExpressions.stream())
.flatMap(Function.identity())
+ .map(
+ expr ->
+
OperationExpressionsUtils.scopeReferencesWithAlias(
+ INPUT_ALIAS, expr))
.map(ResolvedExpression::asSerializableString)
.collect(Collectors.joining(", ")),
OperationUtils.indent(
groupWindow.asSerializableString(child.asSerializableString())),
+ INPUT_ALIAS,
Stream.concat(
Stream.of("window_start", "window_end"),
groupingExpressions.stream()
+ .map(
+ expr ->
+
OperationExpressionsUtils
+
.scopeReferencesWithAlias(
+
INPUT_ALIAS, expr))
.map(ResolvedExpression::asSerializableString))
.collect(Collectors.joining(", ")));
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
index 20342f0d746..4d7af03afa8 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
@@ -28,10 +28,12 @@ import
org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
+import
org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.operations.QueryOperation;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -287,5 +289,57 @@ public class OperationExpressionsUtils {
}
}
+ /**
+ * Adds an input alias to all {@link FieldReferenceExpression} in the
given {@code expression}.
+ */
+ public static ResolvedExpression scopeReferencesWithAlias(
+ final String aliasName, final ResolvedExpression expression) {
+ return expression.accept(
+ new TableReferenceScopingVisitor(Collections.singletonMap(0,
aliasName)));
+ }
+
+ /**
+ * Adds an input alias to all {@link FieldReferenceExpression} in the
given {@code expression}.
+ * This method accepts multiple aliases for given input indices.
+ */
+ public static ResolvedExpression scopeReferencesWithAlias(
+ final Map<Integer, String> inputAliases, final ResolvedExpression
expression) {
+ return expression.accept(new
TableReferenceScopingVisitor(inputAliases));
+ }
+
+ private static class TableReferenceScopingVisitor
+ extends ResolvedExpressionDefaultVisitor<ResolvedExpression> {
+
+ private final Map<Integer, String> inputAliases;
+
+ private TableReferenceScopingVisitor(Map<Integer, String>
inputAliases) {
+ this.inputAliases = inputAliases;
+ }
+
+ @Override
+ public ResolvedExpression visit(CallExpression call) {
+ List<ResolvedExpression> scopedChildren =
+ call.getChildren().stream()
+ .map(c -> c.accept(this))
+ .collect(Collectors.toList());
+ return call.replaceArgs(scopedChildren, call.getOutputDataType());
+ }
+
+ @Override
+ public ResolvedExpression visit(FieldReferenceExpression
fieldReference) {
+ return new FieldReferenceExpression(
+ fieldReference.getName(),
+ fieldReference.getOutputDataType(),
+ fieldReference.getInputIndex(),
+ fieldReference.getFieldIndex(),
+ inputAliases.get(fieldReference.getInputIndex()));
+ }
+
+ @Override
+ protected ResolvedExpression defaultMethod(ResolvedExpression
expression) {
+ return expression;
+ }
+ }
+
private OperationExpressionsUtils() {}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
index 42f19ca3d66..ed835aac7d2 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -34,6 +36,7 @@ import java.util.Objects;
* <li>type
* <li>index of an input the field belongs to
* <li>index of a field within the corresponding input
+ * <li>optional: alias of the input, if it needs to be referenced by name
* </ul>
*/
@PublicEvolving
@@ -52,14 +55,26 @@ public final class FieldReferenceExpression implements
ResolvedExpression {
/** index of a field within the corresponding input. */
private final int fieldIndex;
+ private final @Nullable String inputAlias;
+
public FieldReferenceExpression(
String name, DataType dataType, int inputIndex, int fieldIndex) {
+ this(name, dataType, inputIndex, fieldIndex, null);
+ }
+
+ public FieldReferenceExpression(
+ String name,
+ DataType dataType,
+ int inputIndex,
+ int fieldIndex,
+ @Nullable String inputAlias) {
Preconditions.checkArgument(inputIndex >= 0, "Index of input should be
a positive number");
Preconditions.checkArgument(fieldIndex >= 0, "Index of field should be
a positive number");
this.name = Preconditions.checkNotNull(name, "Field name must not be
null.");
this.dataType = Preconditions.checkNotNull(dataType, "Field data type
must not be null.");
this.inputIndex = inputIndex;
this.fieldIndex = fieldIndex;
+ this.inputAlias = inputAlias;
}
public String getName() {
@@ -91,6 +106,12 @@ public final class FieldReferenceExpression implements
ResolvedExpression {
@Override
public String asSerializableString() {
+ if (inputAlias != null) {
+ return String.format(
+ "%s.%s",
+ EncodingUtils.escapeIdentifier(inputAlias),
+ EncodingUtils.escapeIdentifier(name));
+ }
return EncodingUtils.escapeIdentifier(name);
}
@@ -116,12 +137,13 @@ public final class FieldReferenceExpression implements
ResolvedExpression {
return name.equals(that.name)
&& dataType.equals(that.dataType)
&& inputIndex == that.inputIndex
- && fieldIndex == that.fieldIndex;
+ && fieldIndex == that.fieldIndex
+ && Objects.equals(inputAlias, that.inputAlias);
}
@Override
public int hashCode() {
- return Objects.hash(name, dataType, inputIndex, fieldIndex);
+ return Objects.hash(name, dataType, inputIndex, fieldIndex,
inputAlias);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
index 04526d7b29c..0fac184fb42 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
@@ -74,7 +74,8 @@ public class QueryOperationSqlExecutionTest implements
TableTestProgramRunner {
QueryOperationTestPrograms.SQL_QUERY_OPERATION,
QueryOperationTestPrograms.OVER_WINDOW_RANGE,
QueryOperationTestPrograms.OVER_WINDOW_ROWS,
-
QueryOperationTestPrograms.OVER_WINDOW_ROWS_UNBOUNDED_NO_PARTITION);
+
QueryOperationTestPrograms.OVER_WINDOW_ROWS_UNBOUNDED_NO_PARTITION,
+ QueryOperationTestPrograms.ACCESSING_NESTED_COLUMN);
}
@ParameterizedTest
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
index 285989247b6..a006b1a46c4 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
@@ -63,7 +63,8 @@ public class QueryOperationSqlSerializationTest implements
TableTestProgramRunne
QueryOperationTestPrograms.SQL_QUERY_OPERATION,
QueryOperationTestPrograms.OVER_WINDOW_RANGE,
QueryOperationTestPrograms.OVER_WINDOW_ROWS,
-
QueryOperationTestPrograms.OVER_WINDOW_ROWS_UNBOUNDED_NO_PARTITION);
+
QueryOperationTestPrograms.OVER_WINDOW_ROWS_UNBOUNDED_NO_PARTITION,
+ QueryOperationTestPrograms.ACCESSING_NESTED_COLUMN);
}
@ParameterizedTest
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
index 23c2bd30e82..a53d3d3eda2 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
@@ -59,7 +59,10 @@ public class QueryOperationTestPrograms {
.consumedValues(Row.of(1L, "abc"),
Row.of(2L, "cde"))
.build())
.runTableApi(t -> t.from("s"), "sink")
- .runSql("SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`s`")
+ .runSql(
+ "SELECT `$$T_SOURCE`.`a`, `$$T_SOURCE`.`b` FROM
`default_catalog`"
+ + ".`default_database`.`s` "
+ + "$$T_SOURCE")
.build();
static final TableTestProgram VALUES_QUERY_OPERATION =
@@ -71,10 +74,10 @@ public class QueryOperationTestPrograms {
.build())
.runTableApi(t -> t.fromValues(row(1L, "abc"), row(2L,
"cde")), "sink")
.runSql(
- "SELECT `f0`, `f1` FROM (VALUES \n"
+ "SELECT `$$T_VAL`.`f0`, `$$T_VAL`.`f1` FROM
(VALUES \n"
+ " (CAST(1 AS BIGINT), 'abc'),\n"
+ " (CAST(2 AS BIGINT), 'cde')\n"
- + ") VAL$0(`f0`, `f1`)")
+ + ") $$T_VAL(`f0`, `f1`)")
.build();
static final TableTestProgram FILTER_QUERY_OPERATION =
@@ -91,9 +94,10 @@ public class QueryOperationTestPrograms {
.build())
.runTableApi(t ->
t.from("s").where($("a").isGreaterOrEqual(15)), "sink")
.runSql(
- "SELECT `a`, `b` FROM (\n"
- + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`s`\n"
- + ") WHERE `a` >= 15")
+ "SELECT `$$T_FILTER`.`a`, `$$T_FILTER`.`b` FROM
(\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM `default_catalog`"
+ + ".`default_database`.`s` $$T_SOURCE\n"
+ + ") $$T_FILTER WHERE `$$T_FILTER`.`a` >=
15")
.build();
static final TableTestProgram DISTINCT_QUERY_OPERATION =
@@ -114,11 +118,12 @@ public class QueryOperationTestPrograms {
.runTableApi(
t ->
t.from("s").where($("a").isGreaterOrEqual(15)).distinct(), "sink")
.runSql(
- "SELECT DISTINCT `a`, `b` FROM (\n"
- + " SELECT `a`, `b` FROM (\n"
- + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`s`\n"
- + " ) WHERE `a` >= 15\n"
- + ")")
+ "SELECT DISTINCT `$$T_DISTINCT`.`a`,
`$$T_DISTINCT`.`b` FROM (\n"
+ + " SELECT `$$T_FILTER`.`a`,
`$$T_FILTER`.`b` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM `default_catalog`"
+ + ".`default_database`.`s` $$T_SOURCE\n"
+ + " ) $$T_FILTER WHERE `$$T_FILTER`.`a`
>= 15\n"
+ + ") $$T_DISTINCT")
.build();
static final TableTestProgram AGGREGATE_QUERY_OPERATION =
@@ -140,12 +145,14 @@ public class QueryOperationTestPrograms {
.runTableApi(
t -> t.from("s").groupBy($("b")).select($("b"),
$("a").sum()), "sink")
.runSql(
- "SELECT `b`, `EXPR$0` FROM (\n"
- + " SELECT `b`, (SUM(`a`)) AS `EXPR$0`
FROM (\n"
- + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`s`\n"
- + " )\n"
- + " GROUP BY `b`\n"
- + ")")
+ "SELECT `$$T_PROJECT`.`b`, `$$T_PROJECT`.`EXPR$0`
FROM (\n"
+ + " SELECT `$$T_AGG`.`b`,
(SUM(`$$T_AGG`.`a`)) AS `EXPR$0`"
+ + " FROM (\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM "
+ +
"`default_catalog`.`default_database`.`s` $$T_SOURCE\n"
+ + " ) $$T_AGG\n"
+ + " GROUP BY `$$T_AGG`.`b`\n"
+ + ") $$T_PROJECT")
.build();
static final TableTestProgram AGGREGATE_NO_GROUP_BY_QUERY_OPERATION =
@@ -167,12 +174,13 @@ public class QueryOperationTestPrograms {
.build())
.runTableApi(t -> t.from("s").select($("a").sum()), "sink")
.runSql(
- "SELECT `EXPR$0` FROM (\n"
- + " SELECT (SUM(`a`)) AS `EXPR$0` FROM
(\n"
- + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`s`\n"
- + " )\n"
+ "SELECT `$$T_PROJECT`.`EXPR$0` FROM (\n"
+ + " SELECT (SUM(`$$T_AGG`.`a`)) AS
`EXPR$0` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM "
+ +
"`default_catalog`.`default_database`.`s` $$T_SOURCE\n"
+ + " ) $$T_AGG\n"
+ " GROUP BY 1\n"
- + ")")
+ + ") $$T_PROJECT")
.build();
static final TableTestProgram WINDOW_AGGREGATE_QUERY_OPERATION =
@@ -207,13 +215,14 @@ public class QueryOperationTestPrograms {
.select($("b"), $("w").start(),
$("a").sum()),
"sink")
.runSql(
- "SELECT `b`, `EXPR$0`, `EXPR$1` FROM (\n"
- + " SELECT `b`, (SUM(`a`)) AS `EXPR$1`,
(window_start) AS `EXPR$0` FROM TABLE(\n"
+ "SELECT `$$T_PROJECT`.`b`, `$$T_PROJECT`.`EXPR$0`,
`$$T_PROJECT`.`EXPR$1` FROM (\n"
+ + " SELECT `$$T_WIN_AGG`.`b`,
(SUM(`$$T_WIN_AGG`.`a`)) AS `EXPR$1`, (window_start) AS `EXPR$0` FROM TABLE(\n"
+ " TUMBLE((\n"
- + " SELECT `a`, `b`, `ts` FROM
`default_catalog`.`default_database`.`s`\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b`, "
+ + "`$$T_SOURCE`.`ts` FROM
`default_catalog`.`default_database`.`s` $$T_SOURCE\n"
+ " ), DESCRIPTOR(`ts`), INTERVAL
'0 00:00:05.0' DAY TO SECOND(3))\n"
- + " ) GROUP BY window_start,
window_end, `b`\n"
- + ")")
+ + " ) $$T_WIN_AGG GROUP BY
window_start, window_end, `$$T_WIN_AGG`.`b`\n"
+ + ") $$T_PROJECT")
.build();
private static Instant dayOfSeconds(int second) {
@@ -259,13 +268,22 @@ public class QueryOperationTestPrograms {
.select($("name"), $("d_name"),
$("age")),
"sink")
.runSql(
- "SELECT `name`, `d_name`, `age` FROM (\n"
- + " SELECT `emp_id`, `e_dept_id`,
`name`, `age`, `dept_id`, `d_name` FROM (\n"
- + " SELECT `emp_id`, `e_dept_id`,
`name`, `age` FROM `default_catalog`.`default_database`.`e`\n"
- + " ) INNER JOIN (\n"
- + " SELECT `dept_id`, `d_name` FROM
`default_catalog`.`default_database`.`d`\n"
- + " ) ON (`e_dept_id` = `dept_id`) AND
(`age` >= 21)\n"
- + ")")
+ "SELECT `$$T_PROJECT`.`name`,
`$$T_PROJECT`.`d_name`, `$$T_PROJECT`.`age` FROM (\n"
+ + " SELECT `$$T1_JOIN`.`emp_id`,
`$$T1_JOIN`.`e_dept_id`, "
+ + "`$$T1_JOIN`.`name`, "
+ + "`$$T1_JOIN`.`age`,
`$$T2_JOIN`.`dept_id`, `$$T2_JOIN`"
+ + ".`d_name` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`emp_id`,
`$$T_SOURCE`"
+ + ".`e_dept_id`, `$$T_SOURCE`.`name`,
`$$T_SOURCE`.`age` FROM `default_catalog`.`default_database`.`e` $$T_SOURCE\n"
+ + " ) $$T1_JOIN INNER JOIN (\n"
+ + " SELECT `$$T_SOURCE`.`dept_id`,
`$$T_SOURCE`"
+ + ".`d_name` FROM
`default_catalog`.`default_database`.`d` $$T_SOURCE\n"
+ + " ) $$T2_JOIN ON
(`$$T1_JOIN`.`e_dept_id` = `$$T2_JOIN`"
+ + ".`dept_id`)"
+ + " AND "
+ + "(`$$T1_JOIN`.`age` "
+ + ">= 21)\n"
+ + ") $$T_PROJECT")
.build();
static final TableTestProgram LATERAL_JOIN_QUERY_OPERATION =
@@ -285,10 +303,11 @@ public class QueryOperationTestPrograms {
.runTableApi(
t -> t.from("e").joinLateral(call("udtf",
$("b")).as("f0")), "sink")
.runSql(
- "SELECT `a`, `b`, `f0` FROM (\n"
- + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`e`\n"
- + ") INNER JOIN \n"
- + " LATERAL
TABLE(`default_catalog`.`default_database`.`udtf`(`b`)) T$0(`f0`) ON TRUE")
+ "SELECT `$$T1_JOIN`.`a`, `$$T1_JOIN`.`b`,
`$$T_LAT`.`f0` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM "
+ +
"`default_catalog`.`default_database`.`e` $$T_SOURCE\n"
+ + ") $$T1_JOIN INNER JOIN \n"
+ + " LATERAL
TABLE(`default_catalog`.`default_database`.`udtf`(`b`)) $$T_LAT(`f0`) ON TRUE")
.build();
static final TableTestProgram UNION_ALL_QUERY_OPERATION =
@@ -311,9 +330,11 @@ public class QueryOperationTestPrograms {
.runTableApi(t -> t.from("s").unionAll(t.from("t")),
"sink")
.runSql(
"SELECT `a`, `b` FROM (\n"
- + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`s`\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM "
+ +
"`default_catalog`.`default_database`.`s` $$T_SOURCE\n"
+ ") UNION ALL (\n"
- + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`t`\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM "
+ +
"`default_catalog`.`default_database`.`t` $$T_SOURCE\n"
+ ")")
.build();
@@ -338,9 +359,11 @@ public class QueryOperationTestPrograms {
t -> t.from("s").orderBy($("a"),
$("b").desc()).offset(1).fetch(2),
"sink")
.runSql(
- "SELECT `a`, `b` FROM (\n"
- + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`s`\n"
- + ") ORDER BY `a` ASC, `b` DESC OFFSET 1
ROWS FETCH NEXT 2 ROWS ONLY")
+ "SELECT `$$T_SORT`.`a`, `$$T_SORT`.`b` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM `default_catalog`"
+ + ".`default_database`.`s` $$T_SOURCE\n"
+ + ") $$T_SORT ORDER BY `$$T_SORT`.`a` ASC,
`$$T_SORT`.`b` DESC"
+ + " OFFSET 1 ROWS FETCH NEXT 2 ROWS ONLY")
.build();
static final TableTestProgram SQL_QUERY_OPERATION =
@@ -361,10 +384,12 @@ public class QueryOperationTestPrograms {
.select($("a").plus(2),
$("b").substr(2, 3)),
"sink")
.runSql(
- "SELECT (`a` + 2) AS `_c0`, (SUBSTR(`b`, 2, 3)) AS
`_c1` FROM (\n"
+ "SELECT (`$$T_PROJECT`.`a` + 2) AS `_c0`,
(SUBSTR(`$$T_PROJECT`.`b`, "
+ + "2, 3)) AS "
+ + "`_c1` FROM (\n"
+ " SELECT `s`.`a`, `s`.`b`\n"
+ " FROM
`default_catalog`.`default_database`.`s` AS `s`\n"
- + ")")
+ + ") $$T_PROJECT")
.build();
static final TableTestProgram GROUP_HOP_WINDOW_EVENT_TIME =
@@ -765,9 +790,13 @@ public class QueryOperationTestPrograms {
.consumedAfterRestore(Row.of("Apple", 3L,
dayOfSeconds(2)))
.build())
.runSql(
- "SELECT `k`, (LAST_VALUE(`v`) OVER(PARTITION BY
`k` ORDER BY `ts` RANGE BETWEEN INTERVAL '0 00:00:02.0' DAY TO SECOND(3)
PRECEDING AND CURRENT ROW)) AS `_c1`, `ts` FROM (\n"
- + " SELECT `k`, `v`, `ts` FROM
`default_catalog`.`default_database`.`data`\n"
- + ")")
+ "SELECT `$$T_PROJECT`.`k`,
(LAST_VALUE(`$$T_PROJECT`.`v`) "
+ + "OVER(PARTITION BY `$$T_PROJECT`.`k` "
+ + "ORDER BY `$$T_PROJECT`.`ts` RANGE
BETWEEN INTERVAL '0 "
+ + "00:00:02.0' DAY TO SECOND(3) PRECEDING
AND CURRENT ROW)) AS `_c1`, `$$T_PROJECT`.`ts` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`k`,
`$$T_SOURCE`.`v`, "
+ + "`$$T_SOURCE`.`ts` FROM
`default_catalog`.`default_database`.`data` $$T_SOURCE\n"
+ + ") $$T_PROJECT")
.runTableApi(
tableEnvAccessor ->
tableEnvAccessor
@@ -807,10 +836,14 @@ public class QueryOperationTestPrograms {
.consumedAfterRestore(Row.of("Apple", 3L,
dayOfSeconds(2)))
.build())
.runSql(
- "SELECT `k`, (LAST_VALUE(`v`) OVER(PARTITION BY
`k` ORDER BY `ts` "
- + "ROWS BETWEEN CAST(2 AS BIGINT)
PRECEDING AND CURRENT ROW)) AS `_c1`, `ts` FROM (\n"
- + " SELECT `k`, `v`, `ts` FROM
`default_catalog`.`default_database`.`data`\n"
- + ")")
+ "SELECT `$$T_PROJECT`.`k`,
(LAST_VALUE(`$$T_PROJECT`.`v`) OVER"
+ + "(PARTITION BY `$$T_PROJECT`.`k` "
+ + "ORDER BY `$$T_PROJECT`.`ts` "
+ + "ROWS BETWEEN CAST(2 AS BIGINT)
PRECEDING AND CURRENT ROW))"
+ + " AS `_c1`, `$$T_PROJECT`.`ts` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`k`,
`$$T_SOURCE`.`v`, "
+ + "`$$T_SOURCE`.`ts` FROM
`default_catalog`.`default_database`.`data` $$T_SOURCE\n"
+ + ") $$T_PROJECT")
.runTableApi(
tableEnvAccessor ->
tableEnvAccessor
@@ -852,11 +885,13 @@ public class QueryOperationTestPrograms {
.consumedAfterRestore(Row.of(3L,
dayOfSeconds(2)))
.build())
.runSql(
- "SELECT (LAST_VALUE(`v`) OVER(ORDER BY `ts` "
+ "SELECT (LAST_VALUE(`$$T_PROJECT`.`v`) OVER(ORDER
BY `$$T_PROJECT`"
+ + ".`ts` "
+ "ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW)) AS "
- + "`_c0`, `ts` FROM (\n"
- + " SELECT `k`, `v`, `ts` FROM
`default_catalog`.`default_database`.`data`\n"
- + ")")
+ + "`_c0`, `$$T_PROJECT`.`ts` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`k`,
`$$T_SOURCE`.`v`, "
+ + "`$$T_SOURCE`.`ts` FROM
`default_catalog`.`default_database`.`data` $$T_SOURCE\n"
+ + ") $$T_PROJECT")
.runTableApi(
tableEnvAccessor ->
tableEnvAccessor
@@ -868,4 +903,47 @@ public class QueryOperationTestPrograms {
.select($("v").lastValue().over($("w")), $("ts")),
"sink")
.build();
+
+ static final TableTestProgram ACCESSING_NESTED_COLUMN =
+ TableTestProgram.of(
+ "project-nested-columnd",
+ "test projection with nested columns of an inline
type")
+ .setupTableSource(
+ SourceTestStep.newBuilder("data")
+ .addSchema("f0 bigint")
+ .producedBeforeRestore(Row.of(1L),
Row.of(2L))
+ .producedAfterRestore(Row.of(3L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("v bigint")
+ .consumedBeforeRestore(Row.of(1L),
Row.of(2L))
+ .consumedAfterRestore(Row.of(3L))
+ .build())
+ .runSql(
+ "SELECT
(`$$T_PROJECT`.`composite_column`.`f0_nested`) AS `composite_column$f0_nested`
FROM (\n"
+ + " SELECT
(CAST(ROW(`$$T_PROJECT`.`f0`, 'a') AS ROW<`f0_nested` BIGINT, `f1_nested`
VARCHAR(2147483647)>)) AS `composite_column` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`f0` FROM
`default_catalog`.`default_database`.`data` $$T_SOURCE\n"
+ + " ) $$T_PROJECT\n"
+ + ") $$T_PROJECT")
+ .runTableApi(
+ tableEnvAccessor ->
+ tableEnvAccessor
+ .from("data")
+ .select(
+ row($("f0"), lit("a"))
+ .cast(
+
DataTypes.ROW(
+
DataTypes.FIELD(
+
"f0_nested",
+
DataTypes
+
.BIGINT()),
+
DataTypes.FIELD(
+
"f1_nested",
+
DataTypes
+
.STRING())))
+
.as("composite_column"))
+
.select($("composite_column").get("f0_nested")),
+ "sink")
+ .build();
}