[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-23 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r277669420
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 ##
 @@ -39,164 +34,39 @@ case class LogicalOverWindow(
 following: Optional[Expression])
 
 // 

-// Tumbling group windows
+// Group windows
 // 

 
+sealed trait LogicalWindow {
+  def timeAttribute: PlannerExpression
+  def aliasAttribute: PlannerExpression
+}
+
 case class TumblingGroupWindow(
-alias: PlannerExpression,
-timeField: PlannerExpression,
+aliasAttribute: PlannerExpression,
+timeAttribute: PlannerExpression,
 size: PlannerExpression)
-  extends LogicalWindow(
-alias,
-timeField) {
-
-  override def resolveExpressions(
-  resolve: (PlannerExpression) => PlannerExpression): LogicalWindow =
-TumblingGroupWindow(
-  resolve(alias),
-  resolve(timeField),
-  resolve(size))
-
-  override def validate(tableEnv: TableEnvironment): ValidationResult =
-super.validate(tableEnv).orElse(
-  tableEnv match {
-
-// check size
-case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) =>
-  ValidationFailure(
-"Tumbling window expects size literal of type Interval of 
Milliseconds " +
 
 Review comment:
   where can I find this check for all types of windows?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-23 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r27764
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
 ##
 @@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.AggregateFunctionDefinition;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.ExpressionResolver;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.FunctionDefinition;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.expressions.ResolvedGroupWindow;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.WindowReference;
+import org.apache.flink.table.plan.logical.Aggregate;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.plan.logical.WindowAggregate;
+import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static 
org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+import static 
org.apache.flink.table.expressions.ResolvedGroupWindow.WindowType.SLIDE;
+import static 
org.apache.flink.table.expressions.ResolvedGroupWindow.WindowType.TUMBLE;
+import static 
org.apache.flink.table.typeutils.RowIntervalTypeInfo.INTERVAL_ROWS;
+
+/**
+ * Utility class for creating a valid {@link Aggregate} or {@link 
WindowAggregate}.
+ */
+@Internal
+public class AggregateOperationFactory {
+
+   private final boolean isStreaming;
+   private final ExpressionBridge expressionBridge;
+   private final GroupingExpressionValidator groupingExpressionValidator = 
new GroupingExpressionValidator();
+   private final NoNestedAggregates noNestedAggregates = new 
NoNestedAggregates();
+   private final ValidateDistinct validateDistinct = new 
ValidateDistinct();
+
+   public AggregateOperationFactory(ExpressionBridge 
expressionBridge, boolean isStreaming) {
+   this.expressionBridge = expressionBridge;
+   this.isStreaming = isStreaming;
+   }
+
+   /**
+* Creates a valid {@link Aggregate} operation.
+*
+* @param groupings expressions describing grouping key of aggregates
+* @param aggregates expressions describing aggregation functi

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-23 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r277656998
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
 ##
 @@ -121,11 +115,13 @@ private CalculatedTable createFunctionCall(
fieldNames = aliases.toArray(new 
String[aliasesSize]);
}
 
-   return new CalculatedTable(
+   TypeInformation[] fieldTypes = 
TableEnvironment$.MODULE$.getFieldTypes(resultType);
 
 Review comment:
   ugly but ok for now :/


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-23 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r277660222
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/TableOperationToRelNodeConverter.java
 ##
 @@ -154,6 +162,39 @@ public RelNode visitOther(TableOperation other) {
throw new TableException("Unknown table operation: " + 
other);
}
 
+   @Override
+   public RelNode visitCalculatedTable(CalculatedTableOperation 
calculatedTable) {
+   String[] fieldNames = 
calculatedTable.getTableSchema().getFieldNames();
+   int[] fieldIndices = IntStream.range(0, 
fieldNames.length).toArray();
+   TypeInformation resultType = 
calculatedTable.getResultType();
+
+   @SuppressWarnings("unchecked")
 
 Review comment:
   Add generics to CalculatedTableOperation to avoid this suppression.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-23 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r277652996
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/TableOperationToRelNodeConverter.java
 ##
 @@ -120,4 +144,28 @@ public RelNode visitOther(TableOperation other) {
.collect(toList());
}
}
+
+   private class AggregateVisitor extends 
ExpressionDefaultVisitor {
+
+   @Override
+   public AggCall visitCall(CallExpression call) {
+   if (call.getFunctionDefinition() == AS) {
+   String aggregateName = 
extractValue(call.getChildren().get(1),
 
 Review comment:
   Should we move `extractValue` and `isFunctionOfType` to `ExpressionUtils` in 
`common`? Because it has nothing to do with API similar to 
`ExpressionDefaultVisitor`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-23 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r277655720
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 ##
 @@ -147,4 +147,9 @@ public static Expression toRowInterval(Expression e) {
}
return Optional.empty();
}
+
+   public static boolean isFunctionOfType(Expression expr, 
FunctionDefinition.Type type) {
 
 Review comment:
   Replace with this call at other locations as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-23 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r277637504
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateTableOperation.java
 ##
 @@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.SESSION;
+import static 
org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.SLIDE;
+import static 
org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.TUMBLE;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ *
 
 Review comment:
   Missing comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275319300
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableOperationDefaultVisitor.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A utility {@link TableOperationVisitor} that calls
 
 Review comment:
   I think we don't need a default visitor. Only one implementation is excepted 
in the future, no? All implementations should be forced to implement a logic if 
we add new operations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275319075
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableOperationVisitor.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Class that implements visitor pattern. It allows type safe logic on top of 
tree
+ * of {@link TableOperation}s.
+ */
+@Internal
+public interface TableOperationVisitor {
 
 Review comment:
   nit: one empty line between first declaration and class/interface 
declaration. I know it is not written in any coding style but this common in 
Java.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275322294
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/TableOperationToRelNodeConverter.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.plan.logical.LogicalNode;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.tools.RelBuilder;
+
+/**
+ * Converter from Flink's specific relational representation: {@link 
TableOperation} to Calcite's specific relational
+ * representation: {@link RelNode}.
+ */
+@Internal
+public class TableOperationToRelNodeConverter extends 
TableOperationDefaultVisitor {
 
 Review comment:
   Move this to `o.a.f.table.plan`? Maybe we can find a shorter name? Maybe 
`TableOperationConverter` similar to `PlannerExpressionConverter`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275319676
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java
 ##
 @@ -68,7 +67,7 @@ public 
ProjectionOperationFactory(ExpressionBridge expression
 
public Project create(
List projectList,
-   LogicalNode childNode,
+   TableOperation childNode,
 
 Review comment:
   rename variable as well ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275330244
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AlgebraicTableOperation.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * An algebraic operation on two relations. It provides a way to union, 
intersect or subtract underlying
+ * data sets/streams. Both relations must have equal schemas.
+ */
+@Internal
+public class AlgebraicTableOperation implements TableOperation {
 
 Review comment:
   looks beautiful :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r27533
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -93,12 +94,19 @@ abstract class TableEnvironment(val config: TableConfig) {
 .typeSystem(new FlinkTypeSystem)
 .operatorTable(getSqlOperatorTable)
 .sqlToRelConverterConfig(getSqlToRelConverterConfig)
+.context(Contexts
 
 Review comment:
   add a comment why this is necessary (aka temporal tables).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275327749
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -55,8 +56,7 @@ class TableImpl(
 
   var tableName: String = _
 
-  def getRelNode: RelNode = operationTree.asInstanceOf[LogicalNode]
-.toRelNode(tableEnv.getRelBuilder)
+  def getRelNode: RelNode = 
tableEnv.getRelBuilder.flinkTableOperation(operationTree).build()
 
 Review comment:
   rename to `tableOperation`. I would avoid the name of the project in 
identifiers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275315597
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java
 ##
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.operators.join.JoinType;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.CalculatedTable;
+import org.apache.flink.table.plan.logical.Join;
+import org.apache.flink.table.plan.logical.LogicalNode;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Utility class for creating a valid {@link Join} operation.
+ */
+@Internal
+public class JoinOperationFactory {
+
+   private final ExpressionBridge expressionBridge;
+   private final EquiJoinExistsChecker equiJoinExistsChecker = new 
EquiJoinExistsChecker();
+
+   public JoinOperationFactory(ExpressionBridge 
expressionBridge) {
+   this.expressionBridge = expressionBridge;
+   }
+
+   /**
+* Creates a valid {@link Join} operation.
+*
+* It performs validations such as:
+* 
+* condition returns boolean
+* the condition is either always true or contains equi join
+* left and right side of the join do not contain ambiguous column 
names
+* that correlated join is an INNER join
+* 
+*
+* @param left left side of the relational operation
+* @param right right side of the relational operation
+* @param joinType what sort of join to create
+* @param condition join condition to apply
+* @param correlated if the join should be a correlated join
+* @return valid join operation
+*/
+   public LogicalNode create(
+   TableOperation left,
+   TableOperation right,
+   JoinType joinType,
+   Expression condition,
+   boolean correlated) {
+   verifyConditionType(condition);
+   validateNamesAmbiguity(left, right);
+   validateCondition(right, joinType, condition, correlated);
+
+   PlannerExpression plannerExpression = 
expressionBridge.bridge(condition);
+   return new Join((LogicalNode) left, (LogicalNode) right, 
joinType, Optional.of(plannerExpression), correlated);
+   }
+
+   private void validateCondition(TableOperation right, JoinType joinType, 
Expression condition, boolean correlated) {
+   boolean alwaysTrue = ApiExpressionUtils.extractValue(condition, 
Types.BOOLEAN).orElse(false);
+
+   if (alwaysTrue) {
+   return;
+   }
+
+   Boolean equiJoinExists = 
condition.accept(equiJoinExistsChecker);
+   if (correlated && right instanceof CalculatedTable && joinType 
!= JoinType.INNER) {
+   throw new ValidationException("TableFunction left outer 
join predicate can only be empty or literal true.");
 
 Review comment:
   I think we are calling it "lateral join with a table function" now, right?


This is an automated message from the

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275315193
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java
 ##
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.operators.join.JoinType;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.CalculatedTable;
+import org.apache.flink.table.plan.logical.Join;
+import org.apache.flink.table.plan.logical.LogicalNode;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Utility class for creating a valid {@link Join} operation.
+ */
+@Internal
+public class JoinOperationFactory {
+
+   private final ExpressionBridge expressionBridge;
+   private final EquiJoinExistsChecker equiJoinExistsChecker = new 
EquiJoinExistsChecker();
+
+   public JoinOperationFactory(ExpressionBridge 
expressionBridge) {
+   this.expressionBridge = expressionBridge;
+   }
+
+   /**
+* Creates a valid {@link Join} operation.
+*
+* It performs validations such as:
+* 
+* condition returns boolean
+* the condition is either always true or contains equi join
+* left and right side of the join do not contain ambiguous column 
names
+* that correlated join is an INNER join
+* 
+*
+* @param left left side of the relational operation
+* @param right right side of the relational operation
+* @param joinType what sort of join to create
+* @param condition join condition to apply
+* @param correlated if the join should be a correlated join
+* @return valid join operation
+*/
+   public LogicalNode create(
+   TableOperation left,
+   TableOperation right,
+   JoinType joinType,
+   Expression condition,
+   boolean correlated) {
+   verifyConditionType(condition);
+   validateNamesAmbiguity(left, right);
+   validateCondition(right, joinType, condition, correlated);
+
+   PlannerExpression plannerExpression = 
expressionBridge.bridge(condition);
+   return new Join((LogicalNode) left, (LogicalNode) right, 
joinType, Optional.of(plannerExpression), correlated);
+   }
+
+   private void validateCondition(TableOperation right, JoinType joinType, 
Expression condition, boolean correlated) {
+   boolean alwaysTrue = ApiExpressionUtils.extractValue(condition, 
Types.BOOLEAN).orElse(false);
+
+   if (alwaysTrue) {
+   return;
+   }
+
+   Boolean equiJoinExists = 
condition.accept(equiJoinExistsChecker);
+   if (correlated && right instanceof CalculatedTable && joinType 
!= JoinType.INNER) {
 
 Review comment:
   Introduce our own join types enum. We should not use enums of other APIs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For quer

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275312046
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AlgebraicOperationFactory.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.plan.logical.Intersect;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.Minus;
+import org.apache.flink.table.plan.logical.Union;
+
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.operations.AlgebraicOperationFactory.AlgebraicTableOperationType.UNION;
+
+/**
+ * Utility class for creating a valid algebraic operation such as {@link 
Minus}, {@link Intersect} or {@link Union}.
+ */
+@Internal
+public class AlgebraicOperationFactory {
 
 Review comment:
   Call this `SetOperation` instead (also in comments and related classes) to 
be consistent with tests, rules, etc.
   
   https://en.wikipedia.org/wiki/Set_operations_(SQL)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275313133
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AlgebraicOperationFactory.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.plan.logical.Intersect;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.Minus;
+import org.apache.flink.table.plan.logical.Union;
+
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.operations.AlgebraicOperationFactory.AlgebraicTableOperationType.UNION;
+
+/**
+ * Utility class for creating a valid algebraic operation such as {@link 
Minus}, {@link Intersect} or {@link Union}.
+ */
+@Internal
+public class AlgebraicOperationFactory {
+
+   private final boolean isStreaming;
+
+   /**
+* Describes what kind of operation should be created.
+*/
+   public enum AlgebraicTableOperationType {
+   INTERSECT,
+   MINUS,
+   UNION
+   }
+
+   public AlgebraicOperationFactory(boolean isStreaming) {
+   this.isStreaming = isStreaming;
+   }
+
+   /**
+* Creates a valid algebraic operation.
+*
+* @param type type of operation to create
+* @param left first relational operation of the operation
+* @param right second relational operation of the operation
+* @param all flag defining how duplicates should be handled
+* @return creates a valid algebraic operation
+*/
+   public TableOperation create(
+   AlgebraicTableOperationType type,
+   TableOperation left,
+   TableOperation right,
+   boolean all) {
+   LogicalNode leftNode = (LogicalNode) left;
+   LogicalNode rightNode = (LogicalNode) right;
+   failIfStreaming(type, all);
+   validateAlgebraicOperation(type, leftNode, right);
+   switch (type) {
+   case INTERSECT:
+   return new Intersect(leftNode, rightNode, all);
+   case MINUS:
+   return new Minus(leftNode, rightNode, all);
+   case UNION:
+   return new Union(leftNode, rightNode, all);
+   }
+   throw new TableException("Unknown algebraic operation type: " + 
type);
+   }
+
+   private void validateAlgebraicOperation(
+   AlgebraicTableOperationType operationType,
+   TableOperation left,
+   TableOperation right) {
+   TableSchema leftSchema = left.getTableSchema();
+   int leftFieldCount = leftSchema.getFieldCount();
+   TableSchema rightSchema = right.getTableSchema();
+   int rightFieldCount = rightSchema.getFieldCount();
+
+   if (leftFieldCount != rightFieldCount) {
+   throw new ValidationException(String.format("%s two 
table of different column sizes:" +
+   " %d and %d", operationType, leftFieldCount, 
rightFieldCount));
+   }
+
+   TypeInformation[] leftFieldTypes = 
leftSchema.getFieldTypes();
+   TypeInformation[] rightFieldTypes = 
rightSchema.getFieldTypes();
+   boolean sameSchema = IntStream.range(0, leftFieldCount)
+   .allMatch(idx -> 
leftFieldTypes[idx].equals(rightFieldTypes[idx]));
+
+   if (!sameSchema) {
+   throw new ValidationException(String.format("%s two 
tab

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275309975
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
 ##
 @@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.AggregateFunctionDefinition;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.FunctionDefinition;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.Aggregate;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.plan.logical.WindowAggregate;
+import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+
+/**
+ * Utility class for creating a valid {@link Aggregate} or {@link 
WindowAggregate}.
+ */
+@Internal
+public class AggregateOperationFactory {
+
+   private final boolean isStreaming;
+   private final ExpressionBridge expressionBridge;
+   private final GroupingExpressionValidator groupingExpressionValidator = 
new GroupingExpressionValidator();
+   private final NoChainedAggregates noChainedAggregates = new 
NoChainedAggregates();
+   private final ValidateDistinct validateDistinct = new 
ValidateDistinct();
+
+   public AggregateOperationFactory(ExpressionBridge 
expressionBridge, boolean isStreaming) {
+   this.expressionBridge = expressionBridge;
+   this.isStreaming = isStreaming;
+   }
+
+   /**
+* Creates a valid {@link Aggregate} operation.
+*
+* @param groupings expressions describing grouping key of aggregates
+* @param aggregates expressions describing aggregation functions
+* @param child relational operation on top of which to apply the 
aggregation
+* @return valid aggregate operation
+*/
+   public Aggregate createAggregate(
+   List groupings,
+   List aggregates,
+   TableOperation child) {
+
+   LogicalNode childNode = (LogicalNode) child;
+   validateGroupings(groupings);
+   validateAggregates(groupings, aggregates);
+
+   List convertedGroupings = bridge(groupings);
+   List convertedAggregates = 
bridge(aggregates);
+   return new Aggregate(convertedGroupings, convertedAggregates, 
childNode);
+   }
+
+   /**
+* Creates a valid {@link WindowAggregate} operation.
+*
+* @param groupings expressions describing grouping key of aggregates
+* @param aggregates expressions describing aggregation functions
+* @param windowProperties expressions describing window properties
+* @param window grouping window of this aggregation
+* @param child relational operation on top of which to apply the 
aggregation
+* @return valid window aggregate operation
+*/
+   public WindowAggregate createWindowAggregate(
+   List groupings,
+   List aggregates,
+

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275310522
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
 ##
 @@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.AggregateFunctionDefinition;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.FunctionDefinition;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.Aggregate;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.plan.logical.WindowAggregate;
+import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+
+/**
+ * Utility class for creating a valid {@link Aggregate} or {@link 
WindowAggregate}.
+ */
+@Internal
+public class AggregateOperationFactory {
+
+   private final boolean isStreaming;
+   private final ExpressionBridge expressionBridge;
+   private final GroupingExpressionValidator groupingExpressionValidator = 
new GroupingExpressionValidator();
+   private final NoChainedAggregates noChainedAggregates = new 
NoChainedAggregates();
+   private final ValidateDistinct validateDistinct = new 
ValidateDistinct();
+
+   public AggregateOperationFactory(ExpressionBridge 
expressionBridge, boolean isStreaming) {
+   this.expressionBridge = expressionBridge;
+   this.isStreaming = isStreaming;
+   }
+
+   /**
+* Creates a valid {@link Aggregate} operation.
+*
+* @param groupings expressions describing grouping key of aggregates
+* @param aggregates expressions describing aggregation functions
+* @param child relational operation on top of which to apply the 
aggregation
+* @return valid aggregate operation
+*/
+   public Aggregate createAggregate(
+   List groupings,
+   List aggregates,
+   TableOperation child) {
+
+   LogicalNode childNode = (LogicalNode) child;
+   validateGroupings(groupings);
+   validateAggregates(groupings, aggregates);
+
+   List convertedGroupings = bridge(groupings);
+   List convertedAggregates = 
bridge(aggregates);
+   return new Aggregate(convertedGroupings, convertedAggregates, 
childNode);
+   }
+
+   /**
+* Creates a valid {@link WindowAggregate} operation.
+*
+* @param groupings expressions describing grouping key of aggregates
+* @param aggregates expressions describing aggregation functions
+* @param windowProperties expressions describing window properties
+* @param window grouping window of this aggregation
+* @param child relational operation on top of which to apply the 
aggregation
+* @return valid window aggregate operation
+*/
+   public WindowAggregate createWindowAggregate(
+   List groupings,
+   List aggregates,
+

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275307566
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 ##
 @@ -50,43 +48,35 @@ case class TumblingGroupWindow(
 alias,
 timeField) {
 
-  override def resolveExpressions(
-  resolve: (PlannerExpression) => PlannerExpression): LogicalWindow =
-TumblingGroupWindow(
-  resolve(alias),
-  resolve(timeField),
-  resolve(size))
+  override def validate(isStreaming: Boolean): Unit =
 
-  override def validate(tableEnv: TableEnvironment): ValidationResult =
-super.validate(tableEnv).orElse(
-  tableEnv match {
+  isStreaming match {
 
 // check size
 case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) =>
-  ValidationFailure(
+  throw new ValidationException(
 "Tumbling window expects size literal of type Interval of 
Milliseconds " +
   "or Interval of Rows.")
 
 // check time attribute
-case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
-  ValidationFailure(
+case true if !isTimeAttribute(timeField) =>
 
 Review comment:
   use a regular if/else statement


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275308796
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
 ##
 @@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.AggregateFunctionDefinition;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.FunctionDefinition;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.Aggregate;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.plan.logical.WindowAggregate;
+import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+
+/**
+ * Utility class for creating a valid {@link Aggregate} or {@link 
WindowAggregate}.
+ */
+@Internal
+public class AggregateOperationFactory {
+
+   private final boolean isStreaming;
+   private final ExpressionBridge expressionBridge;
+   private final GroupingExpressionValidator groupingExpressionValidator = 
new GroupingExpressionValidator();
+   private final NoChainedAggregates noChainedAggregates = new 
NoChainedAggregates();
+   private final ValidateDistinct validateDistinct = new 
ValidateDistinct();
+
+   public AggregateOperationFactory(ExpressionBridge 
expressionBridge, boolean isStreaming) {
+   this.expressionBridge = expressionBridge;
+   this.isStreaming = isStreaming;
+   }
+
+   /**
+* Creates a valid {@link Aggregate} operation.
+*
+* @param groupings expressions describing grouping key of aggregates
+* @param aggregates expressions describing aggregation functions
+* @param child relational operation on top of which to apply the 
aggregation
+* @return valid aggregate operation
+*/
+   public Aggregate createAggregate(
+   List groupings,
+   List aggregates,
+   TableOperation child) {
+
+   LogicalNode childNode = (LogicalNode) child;
+   validateGroupings(groupings);
+   validateAggregates(groupings, aggregates);
+
+   List convertedGroupings = bridge(groupings);
+   List convertedAggregates = 
bridge(aggregates);
+   return new Aggregate(convertedGroupings, convertedAggregates, 
childNode);
+   }
+
+   /**
+* Creates a valid {@link WindowAggregate} operation.
+*
+* @param groupings expressions describing grouping key of aggregates
+* @param aggregates expressions describing aggregation functions
+* @param windowProperties expressions describing window properties
+* @param window grouping window of this aggregation
+* @param child relational operation on top of which to apply the 
aggregation
+* @return valid window aggregate operation
+*/
+   public WindowAggregate createWindowAggregate(
+   List groupings,
+   List aggregates,
+

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275307872
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 ##
 @@ -104,54 +94,45 @@ case class SlidingGroupWindow(
 alias,
 timeField) {
 
-  override def resolveExpressions(
-  resolve: (PlannerExpression) => PlannerExpression): LogicalWindow =
-SlidingGroupWindow(
-  resolve(alias),
-  resolve(timeField),
-  resolve(size),
-  resolve(slide))
-
-  override def validate(tableEnv: TableEnvironment): ValidationResult =
-super.validate(tableEnv).orElse(
-  tableEnv match {
+  override def validate(isStreaming: Boolean): Unit =
+isStreaming match {
 
 // check size
 case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) =>
-  ValidationFailure(
+  throw new ValidationException(
 "Sliding window expects size literal of type Interval of 
Milliseconds " +
   "or Interval of Rows.")
 
 // check slide
 case _ if !isTimeIntervalLiteral(slide) && !isRowCountLiteral(slide) =>
-  ValidationFailure(
+  throw new ValidationException(
 "Sliding window expects slide literal of type Interval of 
Milliseconds " +
   "or Interval of Rows.")
 
 // check same type of intervals
 case _ if isTimeIntervalLiteral(size) != isTimeIntervalLiteral(slide) 
=>
-  ValidationFailure("Sliding window expects same type of size and 
slide.")
+  throw new ValidationException("Sliding window expects same type of 
size and slide.")
 
 // check time attribute
-case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
-  ValidationFailure(
+case true if !isTimeAttribute(timeField) =>
+  throw new ValidationException(
 "Sliding window expects a time attribute for grouping in a stream 
environment.")
-case _: BatchTableEnvironment
+case false
   if !(isTimePoint(timeField.resultType) || 
isLong(timeField.resultType)) =>
-  ValidationFailure(
+  throw new ValidationException(
 "Sliding window expects a time attribute for grouping in a stream 
environment.")
 
 // check row intervals on event-time
-case _: StreamTableEnvironment
+case true
 if isRowCountLiteral(size) && isRowtimeAttribute(timeField) =>
-  ValidationFailure(
+  throw new ValidationException(
 "Event-time grouping windows on row intervals in a stream 
environment " +
   "are currently not supported.")
 
 case _ =>
-  ValidationSuccess
+  // validation successful
   }
-)
+
 
 Review comment:
   nit: remove empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275307745
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
 ##
 @@ -30,14 +29,12 @@ import org.apache.flink.table.validate.{ValidationFailure, 
ValidationResult, Val
   */
 abstract class LogicalWindow(
 val aliasAttribute: PlannerExpression,
-val timeAttribute: PlannerExpression)
-  extends Resolvable[LogicalWindow] {
+val timeAttribute: PlannerExpression) {
 
-  def resolveExpressions(resolver: (PlannerExpression) => PlannerExpression): 
LogicalWindow = this
-
-  def validate(tableEnv: TableEnvironment): ValidationResult = aliasAttribute 
match {
-case WindowReference(_, _) => ValidationSuccess
-case _ => ValidationFailure("Window reference for window expected.")
+  def validate(isStreaming: Boolean): Unit = aliasAttribute match {
+case WindowReference(_, _) =>
 
 Review comment:
   add a comment here, first I thought it is a fall-through


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275290056
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
 ##
 @@ -86,7 +85,7 @@ class CorrelateValidationTest extends TableTestBase {
 util.addFunction("func2", new TableFunc2)
 expectExceptionThrown(
   t.joinLateral("func2(c, c)"),
-  "Given parameters of function 'func2' do not match any signature")
+  "Invalid arguments [[c, c]] for call: func2(c, c)")
 
 Review comment:
   Is this exception useful? And also `[[c, c]]` and not `[c, c]`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275289448
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
 ##
 @@ -156,12 +156,12 @@ class TemporalTableJoinTest extends TableTestBase {
   inputRates: TemporalTableFunction,
   proctime: Boolean = false): Unit = {
 val rates = inputRates.asInstanceOf[TemporalTableFunctionImpl]
-assertTrue(rates.getPrimaryKey.isInstanceOf[ResolvedFieldReference])
-assertEquals("currency", 
rates.getPrimaryKey.asInstanceOf[ResolvedFieldReference].name)
-assertTrue(rates.getTimeAttribute.isInstanceOf[ResolvedFieldReference])
+assertTrue(rates.getPrimaryKey.isInstanceOf[FieldReferenceExpression])
 
 Review comment:
   nit: use object equality instead of casting. Just construct the expression 
that you except.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275275067
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.Limit;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.Sort;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDERING;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC;
+
+/**
+ * Utility class for creating a valid {@link Sort} operation.
+ */
+@Internal
+public class SortOperationFactory {
+
+   private final boolean isStreaming;
+   private final ExpressionBridge expressionBridge;
+   private final OrderWrapper orderWrapper = new OrderWrapper();
+
+   public SortOperationFactory(
+   ExpressionBridge expressionBridge,
+   boolean isStreaming) {
+   this.expressionBridge = expressionBridge;
+   this.isStreaming = isStreaming;
+   }
+
+   /**
+* Creates a valid {@link Sort} operation.
+*
+* NOTE: if the collation is not explicitly specified for any 
expression, it is wrapped in a
+* default ascending order
+*
+* @param orders expressions describing order,
+* @param child relational expression on top of which to apply the sort 
operation
+* @return valid sort operation
+*/
+   public Sort createSort(List orders, TableOperation child) {
+   failIfStreaming();
+
+   List convertedOrders = orders.stream()
+   .map(f -> f.accept(orderWrapper))
+   .map(expressionBridge::bridge)
+   .collect(Collectors.toList());
+   return new Sort(convertedOrders, (LogicalNode) child);
+   }
+
+   /**
+* Creates a valid {@link Limit} operation.
+*
+* @param offset offset to start from
+* @param fetch number of records to fetch
+* @return valid limit operation
+*/
+   public Limit createLimit(int offset, int fetch, TableOperation child) {
+   failIfStreaming();
+
+   if (!(child instanceof Sort)) {
+   throw new ValidationException("Limit operator must be 
preceded by an OrderBy operator.");
+   }
+   if (offset < 0) {
+   throw new ValidationException("Offset should be greater 
than or equal to zero.");
+   }
+
+   return new Limit(offset, fetch, (LogicalNode) child);
+   }
+
+   private void failIfStreaming() {
+   if (isStreaming) {
+   throw new ValidationException("Limit on stream tables 
is currently not supported.");
 
 Review comment:
   nit: rename exception to "A limit operation on unbounded tables is currently 
not supported."
   
   we should be consistent with the concepts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275273674
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.Limit;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.Sort;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDERING;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC;
+
+/**
+ * Utility class for creating a valid {@link Sort} operation.
+ */
+@Internal
+public class SortOperationFactory {
+
+   private final boolean isStreaming;
+   private final ExpressionBridge expressionBridge;
+   private final OrderWrapper orderWrapper = new OrderWrapper();
+
+   public SortOperationFactory(
+   ExpressionBridge expressionBridge,
+   boolean isStreaming) {
+   this.expressionBridge = expressionBridge;
+   this.isStreaming = isStreaming;
+   }
+
+   /**
+* Creates a valid {@link Sort} operation.
+*
+* NOTE: if the collation is not explicitly specified for any 
expression, it is wrapped in a
+* default ascending order
+*
+* @param orders expressions describing order,
+* @param child relational expression on top of which to apply the sort 
operation
+* @return valid sort operation
+*/
+   public Sort createSort(List orders, TableOperation child) {
+   failIfStreaming();
+
+   List convertedOrders = orders.stream()
+   .map(f -> f.accept(orderWrapper))
+   .map(expressionBridge::bridge)
+   .collect(Collectors.toList());
+   return new Sort(convertedOrders, (LogicalNode) child);
+   }
+
+   /**
+* Creates a valid {@link Limit} operation.
+*
+* @param offset offset to start from
+* @param fetch number of records to fetch
+* @return valid limit operation
+*/
+   public Limit createLimit(int offset, int fetch, TableOperation child) {
+   failIfStreaming();
+
+   if (!(child instanceof Sort)) {
+   throw new ValidationException("Limit operator must be 
preceded by an OrderBy operator.");
 
 Review comment:
   nit: rename exception to "A limit operation must be preceded by a sort 
operation."? The case class names are not valid anymore and were misleading 
anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275270486
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Utility class for creating valid alias expressions that can be later used 
as a projection.
+ */
+@Internal
+public final class AliasOperationUtils {
+
+   private static final AliasLiteralValidator aliasLiteralValidator = new 
AliasLiteralValidator();
+
+   /**
+* Creates a list of valid alias expressions. Resulting expression 
might still contain
+* {@link UnresolvedReferenceExpression}.
+*
+* @param aliases aliases to validate
+* @param child relational operation on top of which to apply the 
aliases
+* @return validated list of aliases
+*/
+   public static List createAliasList(List 
aliases, TableOperation child) {
+   TableSchema childSchema = child.getTableSchema();
+
+   if (aliases.size() > childSchema.getFieldCount()) {
+   throw new ValidationException("Aliasing more fields 
than we actually have");
+   }
+
+   List fieldAliases = aliases.stream()
+   .map(f -> f.accept(aliasLiteralValidator))
+   .collect(Collectors.toList());
+
+   String[] childNames = childSchema.getFieldNames();
+   return IntStream.range(0, childNames.length)
+   .mapToObj(idx -> {
+   UnresolvedReferenceExpression oldField = new 
UnresolvedReferenceExpression(childNames[idx]);
+   if (idx < fieldAliases.size()) {
+   ValueLiteralExpression alias = 
fieldAliases.get(idx);
+   return new 
CallExpression(BuiltInFunctionDefinitions.AS, Arrays.asList(oldField, alias));
+   } else {
+   return oldField;
+   }
+   }).collect(Collectors.toList());
+   }
+
+   private static class AliasLiteralValidator extends 
ApiExpressionDefaultVisitor {
+
+   @Override
+   public ValueLiteralExpression 
visitValueLiteral(ValueLiteralExpression valueLiteralExpression) {
+   String name = 
ApiExpressionUtils.extractValue(valueLiteralExpression, Types.STRING())
+   .orElseThrow(() -> new ValidationException(
+   "Alias only accept name expressions as 
arguments that is not '*'"));
+
+   if (name.equals("*")) {
 
 Review comment:
   extract '*' into a constant?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-15 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r275271583
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Utility class for creating valid alias expressions that can be later used 
as a projection.
+ */
+@Internal
+public final class AliasOperationUtils {
+
+   private static final AliasLiteralValidator aliasLiteralValidator = new 
AliasLiteralValidator();
+
+   /**
+* Creates a list of valid alias expressions. Resulting expression 
might still contain
+* {@link UnresolvedReferenceExpression}.
+*
+* @param aliases aliases to validate
+* @param child relational operation on top of which to apply the 
aliases
+* @return validated list of aliases
+*/
+   public static List createAliasList(List 
aliases, TableOperation child) {
+   TableSchema childSchema = child.getTableSchema();
+
+   if (aliases.size() > childSchema.getFieldCount()) {
+   throw new ValidationException("Aliasing more fields 
than we actually have");
+   }
+
+   List fieldAliases = aliases.stream()
+   .map(f -> f.accept(aliasLiteralValidator))
+   .collect(Collectors.toList());
+
+   String[] childNames = childSchema.getFieldNames();
+   return IntStream.range(0, childNames.length)
+   .mapToObj(idx -> {
+   UnresolvedReferenceExpression oldField = new 
UnresolvedReferenceExpression(childNames[idx]);
+   if (idx < fieldAliases.size()) {
+   ValueLiteralExpression alias = 
fieldAliases.get(idx);
+   return new 
CallExpression(BuiltInFunctionDefinitions.AS, Arrays.asList(oldField, alias));
+   } else {
+   return oldField;
+   }
+   }).collect(Collectors.toList());
+   }
+
+   private static class AliasLiteralValidator extends 
ApiExpressionDefaultVisitor {
+
+   @Override
+   public ValueLiteralExpression 
visitValueLiteral(ValueLiteralExpression valueLiteralExpression) {
+   String name = 
ApiExpressionUtils.extractValue(valueLiteralExpression, Types.STRING())
+   .orElseThrow(() -> new ValidationException(
+   "Alias only accept name expressions as 
arguments that is not '*'"));
 
 Review comment:
   Could you update the exception messages? Also add a dot at the end and fix 
grammar mistakes. What are `name expressions`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273130779
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 ##
 @@ -71,15 +98,22 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
 val extraNames = args
   .drop(2)
   .map(e => getValue[String](e))
-Alias(args.head, name, extraNames)
+if (extraNames.nonEmpty) {
+  TableAlias(args.head, name, extraNames)
 
 Review comment:
   I agree with Hequn. Maybe we can avoid this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273114839
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/OperationTreeBuilder.scala
 ##
 @@ -37,6 +43,25 @@ import _root_.scala.collection.JavaConverters._
 class OperationTreeBuilder(private val tableEnv: TableEnvironment) {
 
   private val expressionBridge: ExpressionBridge[PlannerExpression] = 
tableEnv.expressionBridge
+  private val columnOperationsFactory = new ColumnOperationsFactory
 
 Review comment:
   This is not really a factory and can just be a util class with static 
methods right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273101387
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/WrapInOrderRule.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FunctionDefinition;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_DESC;
+
+/**
+ * Makes sure that all expressions are wrapped in ordering expression. If the 
expression is either
+ * {@link 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_ASC} or
+ * {@link 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_DESC} it 
does nothing, otherwise
+ * it inserts {@link 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_ASC}.
+ */
+@Internal
+final class WrapInOrderRule implements ResolverRule {
 
 Review comment:
   This logic belongs more to the tree builder than expression resolver.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273049948
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
+
+   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
+
+   private final FieldReferenceLookup fieldLookup;
+
+   private final TableReferenceLookup tableLookup;
+
+   //TODO change to LocalReferenceLookup, once we don't need to resolve 
fields to create LocalReferenceExpression
+   private final Map localReferences = 
new HashMap<>();
+
+   private final Map overWindows;
+
+   private final Function, List> 
resolveFunction;
+
+   private ExpressionResolver(
+   TableReferenceLookup tableLookup,
+   FieldReferenceLookup fieldLookup,
+   List overWindows,
+   @Nullable GroupWindow groupWindow,
+   List rules) {
+   this.tableLookup = Preconditions.checkNotNull(tableLookup);
+   this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
+   this.resolveFunction = concatenateRules(rules);
+
+   this.overWindows = prepareOverWindows(overWindows);
+   prepareLocalReferencesFromGroupWindows(groupWindow);
+   }
+
+   /**
+* Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
+* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273100967
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/StarReferenceFlatteningRule.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Replaces '*' with all available {@link 
org.apache.flink.table.expressions.FieldReferenceExpression}s from underlying
+ * inputs.
+ */
+@Internal
+final class StarReferenceFlatteningRule implements ResolverRule {
+
+   @Override
+   public List apply(List expression, 
ResolutionContext context) {
+   return expression.stream()
+   .flatMap(expr -> expr.accept(new 
FieldFlatteningVisitor(context)).stream())
+   .collect(Collectors.toList());
+   }
+
+   private static class FieldFlatteningVisitor extends 
ApiExpressionDefaultVisitor> {
+
+   private final ResolutionContext resolutionContext;
 
 Review comment:
   Maybe introduce a helper class to reduce a bit of code duplication for every 
rule visitor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273117051
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 ##
 @@ -18,22 +18,25 @@
 
 package org.apache.flink.table.plan.logical
 
+import java.util.Optional
+import java.util.{List => JList}
+
 import org.apache.flink.table.api.{BatchTableEnvironment, 
StreamTableEnvironment, TableEnvironment}
 import 
org.apache.flink.table.expressions.PlannerExpressionUtils.{isRowCountLiteral, 
isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.typeutils.TypeCheckUtils.{isTimePoint, isLong}
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isLong, isTimePoint}
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
 
 // 

 // Over windows
 // 

 
 case class LogicalOverWindow(
 
 Review comment:
   Make this file a POJO in `o.a.f.t.operations` in a separate commit? We need 
some representation of windows as output of the API for the planner. Must not 
necessarily containing expressions only. Or only resolved expressions. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273119314
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
 ##
 @@ -47,67 +47,60 @@ class CalcValidationTest extends TableTestBase {
 .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window 
reference
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testAddColumnsWithAgg(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.addColumns('a.sum)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testAddOrReplaceColumnsWithAgg(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.addOrReplaceColumns('a.sum)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRenameColumnsWithAgg(): Unit = {
   val util = streamTestUtil()
   val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
   tab.renameColumns('a.sum)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRenameColumnsWithoutAlias(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.renameColumns('a)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRenameColumnsWithFunctallCall(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.renameColumns('a + 1  as 'a2)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRenameColumnsNotExist(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.renameColumns('e as 'e2)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testDropColumnsWithAgg(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.dropColumns('a.sum)
   }
 
-  @Test(expected = classOf[TableException])
 
 Review comment:
   Isn't this test still valid?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273052867
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/lookups/FieldReferenceLookup.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.lookups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Provides a way to look up field reference by the name of the field.
+ */
+@Internal
+public interface FieldReferenceLookup {
 
 Review comment:
   There is just one implementation. I would prefer merging this internal 
interface and class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273052519
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
+
+   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
+
+   private final FieldReferenceLookup fieldLookup;
+
+   private final TableReferenceLookup tableLookup;
+
+   //TODO change to LocalReferenceLookup, once we don't need to resolve 
fields to create LocalReferenceExpression
+   private final Map localReferences = 
new HashMap<>();
+
+   private final Map overWindows;
+
+   private final Function, List> 
resolveFunction;
+
+   private ExpressionResolver(
+   TableReferenceLookup tableLookup,
+   FieldReferenceLookup fieldLookup,
+   List overWindows,
+   @Nullable GroupWindow groupWindow,
+   List rules) {
+   this.tableLookup = Preconditions.checkNotNull(tableLookup);
+   this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
+   this.resolveFunction = concatenateRules(rules);
+
+   this.overWindows = prepareOverWindows(overWindows);
+   prepareLocalReferencesFromGroupWindows(groupWindow);
+   }
+
+   /**
+* Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
+* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273108596
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/OperationTreeBuilder.scala
 ##
 @@ -18,16 +18,22 @@
 
 package org.apache.flink.table.plan
 
 Review comment:
   nit: move to `o.a.f.t.operations`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273117168
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ##
 @@ -41,36 +41,17 @@ import org.apache.flink.table.validate.{ValidationFailure, 
ValidationSuccess}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.tools.nsc.interpreter.JList
 
 Review comment:
   wrong import


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273049675
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
+
+   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
+
+   private final FieldReferenceLookup fieldLookup;
+
+   private final TableReferenceLookup tableLookup;
+
+   //TODO change to LocalReferenceLookup, once we don't need to resolve 
fields to create LocalReferenceExpression
+   private final Map localReferences = 
new HashMap<>();
+
+   private final Map overWindows;
+
+   private final Function, List> 
resolveFunction;
+
+   private ExpressionResolver(
+   TableReferenceLookup tableLookup,
+   FieldReferenceLookup fieldLookup,
+   List overWindows,
+   @Nullable GroupWindow groupWindow,
+   List rules) {
+   this.tableLookup = Preconditions.checkNotNull(tableLookup);
+   this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
+   this.resolveFunction = concatenateRules(rules);
+
+   this.overWindows = prepareOverWindows(overWindows);
+   prepareLocalReferencesFromGroupWindows(groupWindow);
+   }
+
+   /**
+* Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
+* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273102653
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+
+/**
+ * Contains instances of {@link ResolverRule}.
+ */
+@PublicEvolving
+public final class ResolverRules {
+
+   /**
+* Rule that resolves flatten call. See {@link FlattenCallRule} for 
details.
+*/
+   public static final ResolverRule FLATTEN_CALL = new FlattenCallRule();
+
+   /**
+* Resolves {@link UnresolvedReferenceExpression}. See {@link 
ReferenceResolverRule} for details.
+*/
+   public static final ResolverRule FIELD_RESOLVE = new 
ReferenceResolverRule();
+
+   /**
+* Resolves call based on argument types. See {@link 
ResolveCallByArgumentsRule} for details.
+*/
+   public static final ResolverRule RESOLVE_CALL = new 
ResolveCallByArgumentsRule();
+
+   /**
+* Concatenates over aggregations with corresponding over window. See 
{@link OverWindowResolverRule} for details.
+*/
+   public static final ResolverRule OVER_WINDOWS = new 
OverWindowResolverRule();
+
+   /**
+* Resolves '*' expressions to corresponding fields of inputs. See 
{@link StarReferenceFlatteningRule} for details.
+*/
+   public static final ResolverRule FLATTEN_STAR_REFERENCE = new 
StarReferenceFlatteningRule();
+
+   /*
+   NON DEFAULT RULES
+*/
+
+   /**
+* Used in sort operation. It assures expression is wrapped in ordering 
expression. See {@link WrapInOrderRule}
+* for details.
+*/
+   public static final ResolverRule WRAP_IN_ORDER = new WrapInOrderRule();
+
+   /**
+* Used in projection operation. It derives name for expression. See 
{@link NameExpressionRule} for details.
+*/
+   public static final ResolverRule NAME_EXPRESSION = new 
NameExpressionRule();
 
 Review comment:
   This rule is more projection related and should be put into the projection 
builder.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273070489
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.CurrentRange;
+import org.apache.flink.table.expressions.CurrentRow;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Joins call to {@link BuiltInFunctionDefinitions#OVER} with corresponding 
over window
+ * and creates a fully resolved over aggregation.
+ */
+@Internal
+final class OverWindowResolverRule implements ResolverRule {
+
+   @Override
+   public List apply(List expression, 
ResolutionContext context) {
+   return expression.stream()
+   .map(expr -> expr.accept(new 
ExpressionResolverVisitor(context)))
+   .collect(Collectors.toList());
+   }
+
+   private class ExpressionResolverVisitor extends 
ApiExpressionDefaultVisitor {
+
+   private final ResolutionContext context;
+
+   ExpressionResolverVisitor(ResolutionContext context) {
+   this.context = context;
+   }
+
+   @Override
+   public Expression visitCall(CallExpression call) {
+
+   if (call.getFunctionDefinition() == 
BuiltInFunctionDefinitions.OVER) {
+   List children = call.getChildren();
+   Expression alias = children.get(1);
+
+   LogicalOverWindow referenceWindow = 
context.getOverWindow(alias)
+   .orElseThrow(() -> new 
ValidationException("Could not resolve over call."));
+
+   Expression following = 
calculateOverWindowFollowing(referenceWindow);
+   List newArgs = new 
ArrayList<>(asList(
+   children.get(0),
+   referenceWindow.orderBy(),
+   referenceWindow.preceding(),
+   following));
+
+   newArgs.addAll(referenceWindow.partitionBy());
+   return new 
CallExpression(call.getFunctionDefinition(), newArgs);
+   } else {
+   return new CallExpression(
+   call.getFunctionDefinition(),
+   call.getChildren().stream().map(expr -> 
expr.accept(this)).collect(toList()));
+   }
+   }
+
+   private Expression 
calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
+   return referenceWindow.following().orElseGet(() -> {
+   PlannerExpression preceding = 
context.bridge(referenceWindow.preceding());
+   if (preceding.resultType() instanceof 
RowIntervalTypeInfo) {
+   return new CurrentRow();
 
 Review comment:
   Change to a call instead of planner expressions.


This is a

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273106216
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -42,8 +42,11 @@ import org.apache.flink.table.functions.{AggregateFunction, 
ScalarFunction, Tabl
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
 import org.apache.flink.util.InstantiationUtil
+import java.util.{Optional, List => JList}
 
 Review comment:
   Unrelated changes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273052322
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
+
+   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
+
+   private final FieldReferenceLookup fieldLookup;
+
+   private final TableReferenceLookup tableLookup;
+
+   //TODO change to LocalReferenceLookup, once we don't need to resolve 
fields to create LocalReferenceExpression
+   private final Map localReferences = 
new HashMap<>();
+
+   private final Map overWindows;
+
+   private final Function, List> 
resolveFunction;
+
+   private ExpressionResolver(
+   TableReferenceLookup tableLookup,
+   FieldReferenceLookup fieldLookup,
+   List overWindows,
+   @Nullable GroupWindow groupWindow,
+   List rules) {
+   this.tableLookup = Preconditions.checkNotNull(tableLookup);
+   this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
+   this.resolveFunction = concatenateRules(rules);
+
+   this.overWindows = prepareOverWindows(overWindows);
+   prepareLocalReferencesFromGroupWindows(groupWindow);
+   }
+
+   /**
+* Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
+* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273120072
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala
 ##
 @@ -148,6 +148,6 @@ class OverWindowValidationTest extends TableTestBase {
 
 table
 .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
-.select('c, 'a.count over 'w, 'w.start, 'w.end)
+.select('c, 'a.count over 'w, 'w.start + 1, 'w.end + 2)
 
 Review comment:
   Unrelated change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273101909
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+
+/**
+ * Contains instances of {@link ResolverRule}.
+ */
+@PublicEvolving
 
 Review comment:
   `@Internal`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273017053
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
 
 Review comment:
   `@Internal`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273033445
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 ##
 @@ -78,12 +83,12 @@ public static SymbolExpression symbol(TableSymbol symbol) {
return new SymbolExpression(symbol);
}
 
-   public static UnresolvedFieldReferenceExpression 
unresolvedFieldRef(String name) {
-   return new UnresolvedFieldReferenceExpression(name);
+   public static UnresolvedReferenceExpression unresolvedFieldRef(String 
name) {
+   return new UnresolvedReferenceExpression(name);
}
 
public static TableReferenceExpression tableRef(String name, Table 
table) {
 
 Review comment:
   Yes, maybe we can make this nicer once we rework table environments and how 
the interplay with catalogs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273025008
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/lookups/OperationFieldLookup.java
 ##
 @@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.lookups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.operations.TableOperation;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Implementation of {@link FieldReferenceLookup} that gives access to fields 
of given {@link TableOperation}s.
+ */
+@Internal
+public class OperationFieldLookup implements FieldReferenceLookup {
 
 Review comment:
   Merge interface and class into one?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273004862
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 ##
 @@ -83,6 +84,11 @@
 */
void printSchema();
 
+   /**
+* Returns underlying logical representation of this table.
+*/
+   TableOperation getTableOperation();
 
 Review comment:
   This method is definitely worth discussing because it fits better to Impl. 
On the otherhand this will be the output of the API and we don't need a lot of 
casting at different positions. We will mark this method as internal and can 
still remove it from the interface in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r272965435
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
 ##
 @@ -232,7 +232,7 @@ object PlannerExpressionParserImpl extends JavaTokenParsers
   lazy val literalExpr: PackratParser[Expression] =
 numberLiteral | doubleQuoteStringLiteral | singleQuoteStringLiteral | 
boolLiteral
 
-  lazy val fieldReference: PackratParser[UnresolvedFieldReferenceExpression] = 
(STAR | ident) ^^ {
+  lazy val fieldReference: PackratParser[UnresolvedReferenceExpression] = 
(STAR | ident) ^^ {
 sym => unresolvedFieldRef(sym)
 
 Review comment:
   Please also update the API util class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r272964598
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 ##
 @@ -78,12 +83,12 @@ public static SymbolExpression symbol(TableSymbol symbol) {
return new SymbolExpression(symbol);
}
 
-   public static UnresolvedFieldReferenceExpression 
unresolvedFieldRef(String name) {
-   return new UnresolvedFieldReferenceExpression(name);
+   public static UnresolvedReferenceExpression unresolvedFieldRef(String 
name) {
+   return new UnresolvedReferenceExpression(name);
}
 
public static TableReferenceExpression tableRef(String name, Table 
table) {
 
 Review comment:
   True, registering the table under the hood is not perfect design.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services