This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c1968ac2880c3784c6c35ffb819932483208807
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jul 1 09:12:41 2019 +0200

    [FLINK-13028][table-api-java] Refactor local over windows
---
 .../flink/table/expressions/LocalOverWindow.java   | 76 ++++++++++++++++++++++
 .../table/expressions/ExpressionResolver.java      | 21 +++---
 .../expressions/rules/OverWindowResolverRule.java  | 16 ++---
 .../table/expressions/rules/ResolverRule.java      |  6 +-
 .../flink/table/plan/logical/groupWindows.scala    | 15 +----
 5 files changed, 98 insertions(+), 36 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
new file mode 100644
index 0000000..45e1a7a
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Local over window created during expression resolution.
+ */
+@Internal
+public final class LocalOverWindow {
+
+       private Expression alias;
+
+       private List<Expression> partitionBy;
+
+       private Expression orderBy;
+
+       private Expression preceding;
+
+       private @Nullable Expression following;
+
+       LocalOverWindow(
+                       Expression alias,
+                       List<Expression> partitionBy,
+                       Expression orderBy,
+                       Expression preceding,
+                       @Nullable Expression following) {
+               this.alias = alias;
+               this.partitionBy = partitionBy;
+               this.orderBy = orderBy;
+               this.preceding = preceding;
+               this.following = following;
+       }
+
+       public Expression getAlias() {
+               return alias;
+       }
+
+       public List<Expression> getPartitionBy() {
+               return partitionBy;
+       }
+
+       public Expression getOrderBy() {
+               return orderBy;
+       }
+
+       public Expression getPreceding() {
+               return preceding;
+       }
+
+       public Optional<Expression> getFollowing() {
+               return Optional.ofNullable(following);
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
index 015751c..ea45f33 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.expressions.rules.ResolverRules;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
@@ -100,13 +99,13 @@ public class ExpressionResolver {
 
        private final Map<String, LocalReferenceExpression> localReferences;
 
-       private final Map<Expression, LogicalOverWindow> overWindows;
+       private final Map<Expression, LocalOverWindow> localOverWindows;
 
        private ExpressionResolver(
                        TableReferenceLookup tableLookup,
                        FunctionLookup functionLookup,
                        FieldReferenceLookup fieldLookup,
-                       List<OverWindow> overWindows,
+                       List<OverWindow> localOverWindows,
                        List<LocalReferenceExpression> localReferences) {
                this.tableLookup = Preconditions.checkNotNull(tableLookup);
                this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
@@ -116,7 +115,7 @@ public class ExpressionResolver {
                        LocalReferenceExpression::getName,
                        Function.identity()
                ));
-               this.overWindows = prepareOverWindows(overWindows);
+               this.localOverWindows = prepareOverWindows(localOverWindows);
        }
 
        /**
@@ -187,11 +186,11 @@ public class ExpressionResolver {
                        );
        }
 
-       private Map<Expression, LogicalOverWindow> 
prepareOverWindows(List<OverWindow> overWindows) {
+       private Map<Expression, LocalOverWindow> 
prepareOverWindows(List<OverWindow> overWindows) {
                return overWindows.stream()
                        .map(this::resolveOverWindow)
                        .collect(Collectors.toMap(
-                               LogicalOverWindow::alias,
+                               LocalOverWindow::getAlias,
                                Function.identity()
                        ));
        }
@@ -262,18 +261,18 @@ public class ExpressionResolver {
                }
 
                @Override
-               public Optional<LogicalOverWindow> getOverWindow(Expression 
alias) {
-                       return Optional.ofNullable(overWindows.get(alias));
+               public Optional<LocalOverWindow> getOverWindow(Expression 
alias) {
+                       return Optional.ofNullable(localOverWindows.get(alias));
                }
        }
 
-       private LogicalOverWindow resolveOverWindow(OverWindow overWindow) {
-               return new LogicalOverWindow(
+       private LocalOverWindow resolveOverWindow(OverWindow overWindow) {
+               return new LocalOverWindow(
                        overWindow.getAlias(),
                        prepareExpressions(overWindow.getPartitioning()),
                        resolveFieldsInSingleExpression(overWindow.getOrder()),
                        
resolveFieldsInSingleExpression(overWindow.getPreceding()),
-                       
overWindow.getFollowing().map(this::resolveFieldsInSingleExpression)
+                       
overWindow.getFollowing().map(this::resolveFieldsInSingleExpression).orElse(null)
                );
        }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
index fcec1ac..3420437 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LocalOverWindow;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.ArrayList;
@@ -68,17 +68,17 @@ final class OverWindowResolverRule implements ResolverRule {
                                List<Expression> children = 
unresolvedCall.getChildren();
                                Expression alias = children.get(1);
 
-                               LogicalOverWindow referenceWindow = 
resolutionContext.getOverWindow(alias)
+                               LocalOverWindow referenceWindow = 
resolutionContext.getOverWindow(alias)
                                        .orElseThrow(() -> new 
ValidationException("Could not resolve over call."));
 
                                Expression following = 
calculateOverWindowFollowing(referenceWindow);
                                List<Expression> newArgs = new 
ArrayList<>(asList(
                                        children.get(0),
-                                       referenceWindow.orderBy(),
-                                       referenceWindow.preceding(),
+                                       referenceWindow.getOrderBy(),
+                                       referenceWindow.getPreceding(),
                                        following));
 
-                               newArgs.addAll(referenceWindow.partitionBy());
+                               
newArgs.addAll(referenceWindow.getPartitionBy());
 
                                return 
unresolvedCall(unresolvedCall.getFunctionDefinition(), newArgs.toArray(new 
Expression[0]));
                        } else {
@@ -90,9 +90,9 @@ final class OverWindowResolverRule implements ResolverRule {
                        }
                }
 
-               private Expression 
calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
-                       return referenceWindow.following().orElseGet(() -> {
-                                       WindowKind kind = 
referenceWindow.preceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
+               private Expression calculateOverWindowFollowing(LocalOverWindow 
referenceWindow) {
+                       return referenceWindow.getFollowing().orElseGet(() -> {
+                                       WindowKind kind = 
referenceWindow.getPreceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
                                        if (kind == WindowKind.ROW) {
                                                return 
unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW);
                                        } else {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
index af51499..802b550 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionResolver;
+import org.apache.flink.table.expressions.LocalOverWindow;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
 import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
 import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 
 import java.util.List;
 import java.util.Optional;
@@ -73,8 +73,8 @@ public interface ResolverRule {
                Optional<LocalReferenceExpression> getLocalReference(String 
alias);
 
                /**
-                * Lookup for over windows.
+                * Access to available local over windows.
                 */
-               Optional<LogicalOverWindow> getOverWindow(Expression alias);
+               Optional<LocalOverWindow> getOverWindow(Expression alias);
        }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index 50651e9..02b735e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -18,20 +18,7 @@
 
 package org.apache.flink.table.plan.logical
 
-import java.util.{Optional, List => JList}
-
-import org.apache.flink.table.expressions.{Expression, PlannerExpression}
-
-// 
------------------------------------------------------------------------------------------------
-// Over windows
-// 
------------------------------------------------------------------------------------------------
-
-case class LogicalOverWindow(
-    alias: Expression,
-    partitionBy: JList[Expression],
-    orderBy: Expression,
-    preceding: Expression,
-    following: Optional[Expression])
+import org.apache.flink.table.expressions.PlannerExpression
 
 // 
------------------------------------------------------------------------------------------------
 // Group windows

Reply via email to