This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8c1968ac2880c3784c6c35ffb819932483208807 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Jul 1 09:12:41 2019 +0200 [FLINK-13028][table-api-java] Refactor local over windows --- .../flink/table/expressions/LocalOverWindow.java | 76 ++++++++++++++++++++++ .../table/expressions/ExpressionResolver.java | 21 +++--- .../expressions/rules/OverWindowResolverRule.java | 16 ++--- .../table/expressions/rules/ResolverRule.java | 6 +- .../flink/table/plan/logical/groupWindows.scala | 15 +---- 5 files changed, 98 insertions(+), 36 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java new file mode 100644 index 0000000..45e1a7a --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Local over window created during expression resolution. + */ +@Internal +public final class LocalOverWindow { + + private Expression alias; + + private List<Expression> partitionBy; + + private Expression orderBy; + + private Expression preceding; + + private @Nullable Expression following; + + LocalOverWindow( + Expression alias, + List<Expression> partitionBy, + Expression orderBy, + Expression preceding, + @Nullable Expression following) { + this.alias = alias; + this.partitionBy = partitionBy; + this.orderBy = orderBy; + this.preceding = preceding; + this.following = following; + } + + public Expression getAlias() { + return alias; + } + + public List<Expression> getPartitionBy() { + return partitionBy; + } + + public Expression getOrderBy() { + return orderBy; + } + + public Expression getPreceding() { + return preceding; + } + + public Optional<Expression> getFollowing() { + return Optional.ofNullable(following); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java index 015751c..ea45f33 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java @@ -30,7 +30,6 @@ import org.apache.flink.table.expressions.rules.ResolverRules; import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.plan.logical.LogicalOverWindow; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; @@ -100,13 +99,13 @@ public class ExpressionResolver { private final Map<String, LocalReferenceExpression> localReferences; - private final Map<Expression, LogicalOverWindow> overWindows; + private final Map<Expression, LocalOverWindow> localOverWindows; private ExpressionResolver( TableReferenceLookup tableLookup, FunctionLookup functionLookup, FieldReferenceLookup fieldLookup, - List<OverWindow> overWindows, + List<OverWindow> localOverWindows, List<LocalReferenceExpression> localReferences) { this.tableLookup = Preconditions.checkNotNull(tableLookup); this.fieldLookup = Preconditions.checkNotNull(fieldLookup); @@ -116,7 +115,7 @@ public class ExpressionResolver { LocalReferenceExpression::getName, Function.identity() )); - this.overWindows = prepareOverWindows(overWindows); + this.localOverWindows = prepareOverWindows(localOverWindows); } /** @@ -187,11 +186,11 @@ public class ExpressionResolver { ); } - private Map<Expression, LogicalOverWindow> prepareOverWindows(List<OverWindow> overWindows) { + private Map<Expression, LocalOverWindow> prepareOverWindows(List<OverWindow> overWindows) { return overWindows.stream() .map(this::resolveOverWindow) .collect(Collectors.toMap( - LogicalOverWindow::alias, + LocalOverWindow::getAlias, Function.identity() )); } @@ -262,18 +261,18 @@ public class ExpressionResolver { } @Override - public Optional<LogicalOverWindow> getOverWindow(Expression alias) { - return Optional.ofNullable(overWindows.get(alias)); + public Optional<LocalOverWindow> getOverWindow(Expression alias) { + return Optional.ofNullable(localOverWindows.get(alias)); } } - private LogicalOverWindow resolveOverWindow(OverWindow overWindow) { - return new LogicalOverWindow( + private LocalOverWindow resolveOverWindow(OverWindow overWindow) { + return new LocalOverWindow( overWindow.getAlias(), prepareExpressions(overWindow.getPartitioning()), resolveFieldsInSingleExpression(overWindow.getOrder()), resolveFieldsInSingleExpression(overWindow.getPreceding()), - overWindow.getFollowing().map(this::resolveFieldsInSingleExpression) + overWindow.getFollowing().map(this::resolveFieldsInSingleExpression).orElse(null) ); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java index fcec1ac..3420437 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java @@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.LocalOverWindow; import org.apache.flink.table.expressions.UnresolvedCallExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionDefinition; -import org.apache.flink.table.plan.logical.LogicalOverWindow; import org.apache.flink.table.types.logical.LogicalType; import java.util.ArrayList; @@ -68,17 +68,17 @@ final class OverWindowResolverRule implements ResolverRule { List<Expression> children = unresolvedCall.getChildren(); Expression alias = children.get(1); - LogicalOverWindow referenceWindow = resolutionContext.getOverWindow(alias) + LocalOverWindow referenceWindow = resolutionContext.getOverWindow(alias) .orElseThrow(() -> new ValidationException("Could not resolve over call.")); Expression following = calculateOverWindowFollowing(referenceWindow); List<Expression> newArgs = new ArrayList<>(asList( children.get(0), - referenceWindow.orderBy(), - referenceWindow.preceding(), + referenceWindow.getOrderBy(), + referenceWindow.getPreceding(), following)); - newArgs.addAll(referenceWindow.partitionBy()); + newArgs.addAll(referenceWindow.getPartitionBy()); return unresolvedCall(unresolvedCall.getFunctionDefinition(), newArgs.toArray(new Expression[0])); } else { @@ -90,9 +90,9 @@ final class OverWindowResolverRule implements ResolverRule { } } - private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) { - return referenceWindow.following().orElseGet(() -> { - WindowKind kind = referenceWindow.preceding().accept(OVER_WINDOW_KIND_EXTRACTOR); + private Expression calculateOverWindowFollowing(LocalOverWindow referenceWindow) { + return referenceWindow.getFollowing().orElseGet(() -> { + WindowKind kind = referenceWindow.getPreceding().accept(OVER_WINDOW_KIND_EXTRACTOR); if (kind == WindowKind.ROW) { return unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW); } else { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java index af51499..802b550 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java @@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionResolver; +import org.apache.flink.table.expressions.LocalOverWindow; import org.apache.flink.table.expressions.LocalReferenceExpression; import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; import org.apache.flink.table.expressions.lookups.TableReferenceLookup; import org.apache.flink.table.functions.FunctionDefinition; -import org.apache.flink.table.plan.logical.LogicalOverWindow; import java.util.List; import java.util.Optional; @@ -73,8 +73,8 @@ public interface ResolverRule { Optional<LocalReferenceExpression> getLocalReference(String alias); /** - * Lookup for over windows. + * Access to available local over windows. */ - Optional<LogicalOverWindow> getOverWindow(Expression alias); + Optional<LocalOverWindow> getOverWindow(Expression alias); } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala index 50651e9..02b735e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala @@ -18,20 +18,7 @@ package org.apache.flink.table.plan.logical -import java.util.{Optional, List => JList} - -import org.apache.flink.table.expressions.{Expression, PlannerExpression} - -// ------------------------------------------------------------------------------------------------ -// Over windows -// ------------------------------------------------------------------------------------------------ - -case class LogicalOverWindow( - alias: Expression, - partitionBy: JList[Expression], - orderBy: Expression, - preceding: Expression, - following: Optional[Expression]) +import org.apache.flink.table.expressions.PlannerExpression // ------------------------------------------------------------------------------------------------ // Group windows