[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r277669420 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala ## @@ -39,164 +34,39 @@ case class LogicalOverWindow( following: Optional[Expression]) // -// Tumbling group windows +// Group windows // +sealed trait LogicalWindow { + def timeAttribute: PlannerExpression + def aliasAttribute: PlannerExpression +} + case class TumblingGroupWindow( -alias: PlannerExpression, -timeField: PlannerExpression, +aliasAttribute: PlannerExpression, +timeAttribute: PlannerExpression, size: PlannerExpression) - extends LogicalWindow( -alias, -timeField) { - - override def resolveExpressions( - resolve: (PlannerExpression) => PlannerExpression): LogicalWindow = -TumblingGroupWindow( - resolve(alias), - resolve(timeField), - resolve(size)) - - override def validate(tableEnv: TableEnvironment): ValidationResult = -super.validate(tableEnv).orElse( - tableEnv match { - -// check size -case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) => - ValidationFailure( -"Tumbling window expects size literal of type Interval of Milliseconds " + Review comment: where can I find this check for all types of windows? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r27764 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java ## @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.GroupWindow; +import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias; +import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.AggregateFunctionDefinition; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.ExpressionResolver; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.FunctionDefinition; +import org.apache.flink.table.expressions.LocalReferenceExpression; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.expressions.ResolvedGroupWindow; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.expressions.WindowReference; +import org.apache.flink.table.plan.logical.Aggregate; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SessionGroupWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.table.plan.logical.WindowAggregate; +import org.apache.flink.table.typeutils.RowIntervalTypeInfo; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.table.typeutils.TimeIntervalTypeInfo; + +import java.util.List; +import java.util.stream.Collectors; + +import scala.Some; + +import static java.util.Collections.singletonList; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; +import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGATE_FUNCTION; +import static org.apache.flink.table.expressions.ResolvedGroupWindow.WindowType.SLIDE; +import static org.apache.flink.table.expressions.ResolvedGroupWindow.WindowType.TUMBLE; +import static org.apache.flink.table.typeutils.RowIntervalTypeInfo.INTERVAL_ROWS; + +/** + * Utility class for creating a valid {@link Aggregate} or {@link WindowAggregate}. + */ +@Internal +public class AggregateOperationFactory { + + private final boolean isStreaming; + private final ExpressionBridge expressionBridge; + private final GroupingExpressionValidator groupingExpressionValidator = new GroupingExpressionValidator(); + private final NoNestedAggregates noNestedAggregates = new NoNestedAggregates(); + private final ValidateDistinct validateDistinct = new ValidateDistinct(); + + public AggregateOperationFactory(ExpressionBridge expressionBridge, boolean isStreaming) { + this.expressionBridge = expressionBridge; + this.isStreaming = isStreaming; + } + + /** +* Creates a valid {@link Aggregate} operation. +* +* @param groupings expressions describing grouping key of aggregates +* @param aggregates expressions describing aggregation functi
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r277656998 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java ## @@ -121,11 +115,13 @@ private CalculatedTable createFunctionCall( fieldNames = aliases.toArray(new String[aliasesSize]); } - return new CalculatedTable( + TypeInformation[] fieldTypes = TableEnvironment$.MODULE$.getFieldTypes(resultType); Review comment: ugly but ok for now :/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r277660222 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/TableOperationToRelNodeConverter.java ## @@ -154,6 +162,39 @@ public RelNode visitOther(TableOperation other) { throw new TableException("Unknown table operation: " + other); } + @Override + public RelNode visitCalculatedTable(CalculatedTableOperation calculatedTable) { + String[] fieldNames = calculatedTable.getTableSchema().getFieldNames(); + int[] fieldIndices = IntStream.range(0, fieldNames.length).toArray(); + TypeInformation resultType = calculatedTable.getResultType(); + + @SuppressWarnings("unchecked") Review comment: Add generics to CalculatedTableOperation to avoid this suppression. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r277652996 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/TableOperationToRelNodeConverter.java ## @@ -120,4 +144,28 @@ public RelNode visitOther(TableOperation other) { .collect(toList()); } } + + private class AggregateVisitor extends ExpressionDefaultVisitor { + + @Override + public AggCall visitCall(CallExpression call) { + if (call.getFunctionDefinition() == AS) { + String aggregateName = extractValue(call.getChildren().get(1), Review comment: Should we move `extractValue` and `isFunctionOfType` to `ExpressionUtils` in `common`? Because it has nothing to do with API similar to `ExpressionDefaultVisitor`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r277655720 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java ## @@ -147,4 +147,9 @@ public static Expression toRowInterval(Expression e) { } return Optional.empty(); } + + public static boolean isFunctionOfType(Expression expr, FunctionDefinition.Type type) { Review comment: Replace with this call at other locations as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r277637504 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateTableOperation.java ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.SESSION; +import static org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.SLIDE; +import static org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.TUMBLE; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Review comment: Missing comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275319300 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableOperationDefaultVisitor.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; + +/** + * A utility {@link TableOperationVisitor} that calls Review comment: I think we don't need a default visitor. Only one implementation is excepted in the future, no? All implementations should be forced to implement a logic if we add new operations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275319075 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableOperationVisitor.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; + +/** + * Class that implements visitor pattern. It allows type safe logic on top of tree + * of {@link TableOperation}s. + */ +@Internal +public interface TableOperationVisitor { Review comment: nit: one empty line between first declaration and class/interface declaration. I know it is not written in any coding style but this common in Java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275322294 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/TableOperationToRelNodeConverter.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.plan.logical.LogicalNode; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.tools.RelBuilder; + +/** + * Converter from Flink's specific relational representation: {@link TableOperation} to Calcite's specific relational + * representation: {@link RelNode}. + */ +@Internal +public class TableOperationToRelNodeConverter extends TableOperationDefaultVisitor { Review comment: Move this to `o.a.f.table.plan`? Maybe we can find a shorter name? Maybe `TableOperationConverter` similar to `PlannerExpressionConverter`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275319676 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java ## @@ -68,7 +67,7 @@ public ProjectionOperationFactory(ExpressionBridge expression public Project create( List projectList, - LogicalNode childNode, + TableOperation childNode, Review comment: rename variable as well ;-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275330244 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AlgebraicTableOperation.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; + +import java.util.Arrays; +import java.util.List; + +/** + * An algebraic operation on two relations. It provides a way to union, intersect or subtract underlying + * data sets/streams. Both relations must have equal schemas. + */ +@Internal +public class AlgebraicTableOperation implements TableOperation { Review comment: looks beautiful :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r27533 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -93,12 +94,19 @@ abstract class TableEnvironment(val config: TableConfig) { .typeSystem(new FlinkTypeSystem) .operatorTable(getSqlOperatorTable) .sqlToRelConverterConfig(getSqlToRelConverterConfig) +.context(Contexts Review comment: add a comment why this is necessary (aka temporal tables). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275327749 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -55,8 +56,7 @@ class TableImpl( var tableName: String = _ - def getRelNode: RelNode = operationTree.asInstanceOf[LogicalNode] -.toRelNode(tableEnv.getRelBuilder) + def getRelNode: RelNode = tableEnv.getRelBuilder.flinkTableOperation(operationTree).build() Review comment: rename to `tableOperation`. I would avoid the name of the project in identifiers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275315597 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.operators.join.JoinType; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.CalculatedTable; +import org.apache.flink.table.plan.logical.Join; +import org.apache.flink.table.plan.logical.LogicalNode; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static java.util.Arrays.asList; + +/** + * Utility class for creating a valid {@link Join} operation. + */ +@Internal +public class JoinOperationFactory { + + private final ExpressionBridge expressionBridge; + private final EquiJoinExistsChecker equiJoinExistsChecker = new EquiJoinExistsChecker(); + + public JoinOperationFactory(ExpressionBridge expressionBridge) { + this.expressionBridge = expressionBridge; + } + + /** +* Creates a valid {@link Join} operation. +* +* It performs validations such as: +* +* condition returns boolean +* the condition is either always true or contains equi join +* left and right side of the join do not contain ambiguous column names +* that correlated join is an INNER join +* +* +* @param left left side of the relational operation +* @param right right side of the relational operation +* @param joinType what sort of join to create +* @param condition join condition to apply +* @param correlated if the join should be a correlated join +* @return valid join operation +*/ + public LogicalNode create( + TableOperation left, + TableOperation right, + JoinType joinType, + Expression condition, + boolean correlated) { + verifyConditionType(condition); + validateNamesAmbiguity(left, right); + validateCondition(right, joinType, condition, correlated); + + PlannerExpression plannerExpression = expressionBridge.bridge(condition); + return new Join((LogicalNode) left, (LogicalNode) right, joinType, Optional.of(plannerExpression), correlated); + } + + private void validateCondition(TableOperation right, JoinType joinType, Expression condition, boolean correlated) { + boolean alwaysTrue = ApiExpressionUtils.extractValue(condition, Types.BOOLEAN).orElse(false); + + if (alwaysTrue) { + return; + } + + Boolean equiJoinExists = condition.accept(equiJoinExistsChecker); + if (correlated && right instanceof CalculatedTable && joinType != JoinType.INNER) { + throw new ValidationException("TableFunction left outer join predicate can only be empty or literal true."); Review comment: I think we are calling it "lateral join with a table function" now, right? This is an automated message from the
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275315193 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.operators.join.JoinType; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.CalculatedTable; +import org.apache.flink.table.plan.logical.Join; +import org.apache.flink.table.plan.logical.LogicalNode; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static java.util.Arrays.asList; + +/** + * Utility class for creating a valid {@link Join} operation. + */ +@Internal +public class JoinOperationFactory { + + private final ExpressionBridge expressionBridge; + private final EquiJoinExistsChecker equiJoinExistsChecker = new EquiJoinExistsChecker(); + + public JoinOperationFactory(ExpressionBridge expressionBridge) { + this.expressionBridge = expressionBridge; + } + + /** +* Creates a valid {@link Join} operation. +* +* It performs validations such as: +* +* condition returns boolean +* the condition is either always true or contains equi join +* left and right side of the join do not contain ambiguous column names +* that correlated join is an INNER join +* +* +* @param left left side of the relational operation +* @param right right side of the relational operation +* @param joinType what sort of join to create +* @param condition join condition to apply +* @param correlated if the join should be a correlated join +* @return valid join operation +*/ + public LogicalNode create( + TableOperation left, + TableOperation right, + JoinType joinType, + Expression condition, + boolean correlated) { + verifyConditionType(condition); + validateNamesAmbiguity(left, right); + validateCondition(right, joinType, condition, correlated); + + PlannerExpression plannerExpression = expressionBridge.bridge(condition); + return new Join((LogicalNode) left, (LogicalNode) right, joinType, Optional.of(plannerExpression), correlated); + } + + private void validateCondition(TableOperation right, JoinType joinType, Expression condition, boolean correlated) { + boolean alwaysTrue = ApiExpressionUtils.extractValue(condition, Types.BOOLEAN).orElse(false); + + if (alwaysTrue) { + return; + } + + Boolean equiJoinExists = condition.accept(equiJoinExistsChecker); + if (correlated && right instanceof CalculatedTable && joinType != JoinType.INNER) { Review comment: Introduce our own join types enum. We should not use enums of other APIs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For quer
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275312046 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AlgebraicOperationFactory.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.plan.logical.Intersect; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.Minus; +import org.apache.flink.table.plan.logical.Union; + +import java.util.stream.IntStream; + +import static org.apache.flink.table.operations.AlgebraicOperationFactory.AlgebraicTableOperationType.UNION; + +/** + * Utility class for creating a valid algebraic operation such as {@link Minus}, {@link Intersect} or {@link Union}. + */ +@Internal +public class AlgebraicOperationFactory { Review comment: Call this `SetOperation` instead (also in comments and related classes) to be consistent with tests, rules, etc. https://en.wikipedia.org/wiki/Set_operations_(SQL) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275313133 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AlgebraicOperationFactory.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.plan.logical.Intersect; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.Minus; +import org.apache.flink.table.plan.logical.Union; + +import java.util.stream.IntStream; + +import static org.apache.flink.table.operations.AlgebraicOperationFactory.AlgebraicTableOperationType.UNION; + +/** + * Utility class for creating a valid algebraic operation such as {@link Minus}, {@link Intersect} or {@link Union}. + */ +@Internal +public class AlgebraicOperationFactory { + + private final boolean isStreaming; + + /** +* Describes what kind of operation should be created. +*/ + public enum AlgebraicTableOperationType { + INTERSECT, + MINUS, + UNION + } + + public AlgebraicOperationFactory(boolean isStreaming) { + this.isStreaming = isStreaming; + } + + /** +* Creates a valid algebraic operation. +* +* @param type type of operation to create +* @param left first relational operation of the operation +* @param right second relational operation of the operation +* @param all flag defining how duplicates should be handled +* @return creates a valid algebraic operation +*/ + public TableOperation create( + AlgebraicTableOperationType type, + TableOperation left, + TableOperation right, + boolean all) { + LogicalNode leftNode = (LogicalNode) left; + LogicalNode rightNode = (LogicalNode) right; + failIfStreaming(type, all); + validateAlgebraicOperation(type, leftNode, right); + switch (type) { + case INTERSECT: + return new Intersect(leftNode, rightNode, all); + case MINUS: + return new Minus(leftNode, rightNode, all); + case UNION: + return new Union(leftNode, rightNode, all); + } + throw new TableException("Unknown algebraic operation type: " + type); + } + + private void validateAlgebraicOperation( + AlgebraicTableOperationType operationType, + TableOperation left, + TableOperation right) { + TableSchema leftSchema = left.getTableSchema(); + int leftFieldCount = leftSchema.getFieldCount(); + TableSchema rightSchema = right.getTableSchema(); + int rightFieldCount = rightSchema.getFieldCount(); + + if (leftFieldCount != rightFieldCount) { + throw new ValidationException(String.format("%s two table of different column sizes:" + + " %d and %d", operationType, leftFieldCount, rightFieldCount)); + } + + TypeInformation[] leftFieldTypes = leftSchema.getFieldTypes(); + TypeInformation[] rightFieldTypes = rightSchema.getFieldTypes(); + boolean sameSchema = IntStream.range(0, leftFieldCount) + .allMatch(idx -> leftFieldTypes[idx].equals(rightFieldTypes[idx])); + + if (!sameSchema) { + throw new ValidationException(String.format("%s two tab
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275309975 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.AggregateFunctionDefinition; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.FunctionDefinition; +import org.apache.flink.table.expressions.LocalReferenceExpression; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.Aggregate; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.table.plan.logical.WindowAggregate; +import org.apache.flink.table.typeutils.RowIntervalTypeInfo; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGATE_FUNCTION; + +/** + * Utility class for creating a valid {@link Aggregate} or {@link WindowAggregate}. + */ +@Internal +public class AggregateOperationFactory { + + private final boolean isStreaming; + private final ExpressionBridge expressionBridge; + private final GroupingExpressionValidator groupingExpressionValidator = new GroupingExpressionValidator(); + private final NoChainedAggregates noChainedAggregates = new NoChainedAggregates(); + private final ValidateDistinct validateDistinct = new ValidateDistinct(); + + public AggregateOperationFactory(ExpressionBridge expressionBridge, boolean isStreaming) { + this.expressionBridge = expressionBridge; + this.isStreaming = isStreaming; + } + + /** +* Creates a valid {@link Aggregate} operation. +* +* @param groupings expressions describing grouping key of aggregates +* @param aggregates expressions describing aggregation functions +* @param child relational operation on top of which to apply the aggregation +* @return valid aggregate operation +*/ + public Aggregate createAggregate( + List groupings, + List aggregates, + TableOperation child) { + + LogicalNode childNode = (LogicalNode) child; + validateGroupings(groupings); + validateAggregates(groupings, aggregates); + + List convertedGroupings = bridge(groupings); + List convertedAggregates = bridge(aggregates); + return new Aggregate(convertedGroupings, convertedAggregates, childNode); + } + + /** +* Creates a valid {@link WindowAggregate} operation. +* +* @param groupings expressions describing grouping key of aggregates +* @param aggregates expressions describing aggregation functions +* @param windowProperties expressions describing window properties +* @param window grouping window of this aggregation +* @param child relational operation on top of which to apply the aggregation +* @return valid window aggregate operation +*/ + public WindowAggregate createWindowAggregate( + List groupings, + List aggregates, +
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275310522 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.AggregateFunctionDefinition; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.FunctionDefinition; +import org.apache.flink.table.expressions.LocalReferenceExpression; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.Aggregate; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.table.plan.logical.WindowAggregate; +import org.apache.flink.table.typeutils.RowIntervalTypeInfo; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGATE_FUNCTION; + +/** + * Utility class for creating a valid {@link Aggregate} or {@link WindowAggregate}. + */ +@Internal +public class AggregateOperationFactory { + + private final boolean isStreaming; + private final ExpressionBridge expressionBridge; + private final GroupingExpressionValidator groupingExpressionValidator = new GroupingExpressionValidator(); + private final NoChainedAggregates noChainedAggregates = new NoChainedAggregates(); + private final ValidateDistinct validateDistinct = new ValidateDistinct(); + + public AggregateOperationFactory(ExpressionBridge expressionBridge, boolean isStreaming) { + this.expressionBridge = expressionBridge; + this.isStreaming = isStreaming; + } + + /** +* Creates a valid {@link Aggregate} operation. +* +* @param groupings expressions describing grouping key of aggregates +* @param aggregates expressions describing aggregation functions +* @param child relational operation on top of which to apply the aggregation +* @return valid aggregate operation +*/ + public Aggregate createAggregate( + List groupings, + List aggregates, + TableOperation child) { + + LogicalNode childNode = (LogicalNode) child; + validateGroupings(groupings); + validateAggregates(groupings, aggregates); + + List convertedGroupings = bridge(groupings); + List convertedAggregates = bridge(aggregates); + return new Aggregate(convertedGroupings, convertedAggregates, childNode); + } + + /** +* Creates a valid {@link WindowAggregate} operation. +* +* @param groupings expressions describing grouping key of aggregates +* @param aggregates expressions describing aggregation functions +* @param windowProperties expressions describing window properties +* @param window grouping window of this aggregation +* @param child relational operation on top of which to apply the aggregation +* @return valid window aggregate operation +*/ + public WindowAggregate createWindowAggregate( + List groupings, + List aggregates, +
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275307566 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala ## @@ -50,43 +48,35 @@ case class TumblingGroupWindow( alias, timeField) { - override def resolveExpressions( - resolve: (PlannerExpression) => PlannerExpression): LogicalWindow = -TumblingGroupWindow( - resolve(alias), - resolve(timeField), - resolve(size)) + override def validate(isStreaming: Boolean): Unit = - override def validate(tableEnv: TableEnvironment): ValidationResult = -super.validate(tableEnv).orElse( - tableEnv match { + isStreaming match { // check size case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) => - ValidationFailure( + throw new ValidationException( "Tumbling window expects size literal of type Interval of Milliseconds " + "or Interval of Rows.") // check time attribute -case _: StreamTableEnvironment if !isTimeAttribute(timeField) => - ValidationFailure( +case true if !isTimeAttribute(timeField) => Review comment: use a regular if/else statement This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275308796 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.AggregateFunctionDefinition; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.FunctionDefinition; +import org.apache.flink.table.expressions.LocalReferenceExpression; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.Aggregate; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.table.plan.logical.WindowAggregate; +import org.apache.flink.table.typeutils.RowIntervalTypeInfo; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGATE_FUNCTION; + +/** + * Utility class for creating a valid {@link Aggregate} or {@link WindowAggregate}. + */ +@Internal +public class AggregateOperationFactory { + + private final boolean isStreaming; + private final ExpressionBridge expressionBridge; + private final GroupingExpressionValidator groupingExpressionValidator = new GroupingExpressionValidator(); + private final NoChainedAggregates noChainedAggregates = new NoChainedAggregates(); + private final ValidateDistinct validateDistinct = new ValidateDistinct(); + + public AggregateOperationFactory(ExpressionBridge expressionBridge, boolean isStreaming) { + this.expressionBridge = expressionBridge; + this.isStreaming = isStreaming; + } + + /** +* Creates a valid {@link Aggregate} operation. +* +* @param groupings expressions describing grouping key of aggregates +* @param aggregates expressions describing aggregation functions +* @param child relational operation on top of which to apply the aggregation +* @return valid aggregate operation +*/ + public Aggregate createAggregate( + List groupings, + List aggregates, + TableOperation child) { + + LogicalNode childNode = (LogicalNode) child; + validateGroupings(groupings); + validateAggregates(groupings, aggregates); + + List convertedGroupings = bridge(groupings); + List convertedAggregates = bridge(aggregates); + return new Aggregate(convertedGroupings, convertedAggregates, childNode); + } + + /** +* Creates a valid {@link WindowAggregate} operation. +* +* @param groupings expressions describing grouping key of aggregates +* @param aggregates expressions describing aggregation functions +* @param windowProperties expressions describing window properties +* @param window grouping window of this aggregation +* @param child relational operation on top of which to apply the aggregation +* @return valid window aggregate operation +*/ + public WindowAggregate createWindowAggregate( + List groupings, + List aggregates, +
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275307872 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala ## @@ -104,54 +94,45 @@ case class SlidingGroupWindow( alias, timeField) { - override def resolveExpressions( - resolve: (PlannerExpression) => PlannerExpression): LogicalWindow = -SlidingGroupWindow( - resolve(alias), - resolve(timeField), - resolve(size), - resolve(slide)) - - override def validate(tableEnv: TableEnvironment): ValidationResult = -super.validate(tableEnv).orElse( - tableEnv match { + override def validate(isStreaming: Boolean): Unit = +isStreaming match { // check size case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) => - ValidationFailure( + throw new ValidationException( "Sliding window expects size literal of type Interval of Milliseconds " + "or Interval of Rows.") // check slide case _ if !isTimeIntervalLiteral(slide) && !isRowCountLiteral(slide) => - ValidationFailure( + throw new ValidationException( "Sliding window expects slide literal of type Interval of Milliseconds " + "or Interval of Rows.") // check same type of intervals case _ if isTimeIntervalLiteral(size) != isTimeIntervalLiteral(slide) => - ValidationFailure("Sliding window expects same type of size and slide.") + throw new ValidationException("Sliding window expects same type of size and slide.") // check time attribute -case _: StreamTableEnvironment if !isTimeAttribute(timeField) => - ValidationFailure( +case true if !isTimeAttribute(timeField) => + throw new ValidationException( "Sliding window expects a time attribute for grouping in a stream environment.") -case _: BatchTableEnvironment +case false if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) => - ValidationFailure( + throw new ValidationException( "Sliding window expects a time attribute for grouping in a stream environment.") // check row intervals on event-time -case _: StreamTableEnvironment +case true if isRowCountLiteral(size) && isRowtimeAttribute(timeField) => - ValidationFailure( + throw new ValidationException( "Event-time grouping windows on row intervals in a stream environment " + "are currently not supported.") case _ => - ValidationSuccess + // validation successful } -) + Review comment: nit: remove empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275307745 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala ## @@ -30,14 +29,12 @@ import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, Val */ abstract class LogicalWindow( val aliasAttribute: PlannerExpression, -val timeAttribute: PlannerExpression) - extends Resolvable[LogicalWindow] { +val timeAttribute: PlannerExpression) { - def resolveExpressions(resolver: (PlannerExpression) => PlannerExpression): LogicalWindow = this - - def validate(tableEnv: TableEnvironment): ValidationResult = aliasAttribute match { -case WindowReference(_, _) => ValidationSuccess -case _ => ValidationFailure("Window reference for window expected.") + def validate(isStreaming: Boolean): Unit = aliasAttribute match { +case WindowReference(_, _) => Review comment: add a comment here, first I thought it is a fall-through This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275290056 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala ## @@ -86,7 +85,7 @@ class CorrelateValidationTest extends TableTestBase { util.addFunction("func2", new TableFunc2) expectExceptionThrown( t.joinLateral("func2(c, c)"), - "Given parameters of function 'func2' do not match any signature") + "Invalid arguments [[c, c]] for call: func2(c, c)") Review comment: Is this exception useful? And also `[[c, c]]` and not `[c, c]`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275289448 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala ## @@ -156,12 +156,12 @@ class TemporalTableJoinTest extends TableTestBase { inputRates: TemporalTableFunction, proctime: Boolean = false): Unit = { val rates = inputRates.asInstanceOf[TemporalTableFunctionImpl] -assertTrue(rates.getPrimaryKey.isInstanceOf[ResolvedFieldReference]) -assertEquals("currency", rates.getPrimaryKey.asInstanceOf[ResolvedFieldReference].name) -assertTrue(rates.getTimeAttribute.isInstanceOf[ResolvedFieldReference]) +assertTrue(rates.getPrimaryKey.isInstanceOf[FieldReferenceExpression]) Review comment: nit: use object equality instead of casting. Just construct the expression that you except. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275275067 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.Limit; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.Sort; + +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Collections.singletonList; +import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDERING; +import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC; + +/** + * Utility class for creating a valid {@link Sort} operation. + */ +@Internal +public class SortOperationFactory { + + private final boolean isStreaming; + private final ExpressionBridge expressionBridge; + private final OrderWrapper orderWrapper = new OrderWrapper(); + + public SortOperationFactory( + ExpressionBridge expressionBridge, + boolean isStreaming) { + this.expressionBridge = expressionBridge; + this.isStreaming = isStreaming; + } + + /** +* Creates a valid {@link Sort} operation. +* +* NOTE: if the collation is not explicitly specified for any expression, it is wrapped in a +* default ascending order +* +* @param orders expressions describing order, +* @param child relational expression on top of which to apply the sort operation +* @return valid sort operation +*/ + public Sort createSort(List orders, TableOperation child) { + failIfStreaming(); + + List convertedOrders = orders.stream() + .map(f -> f.accept(orderWrapper)) + .map(expressionBridge::bridge) + .collect(Collectors.toList()); + return new Sort(convertedOrders, (LogicalNode) child); + } + + /** +* Creates a valid {@link Limit} operation. +* +* @param offset offset to start from +* @param fetch number of records to fetch +* @return valid limit operation +*/ + public Limit createLimit(int offset, int fetch, TableOperation child) { + failIfStreaming(); + + if (!(child instanceof Sort)) { + throw new ValidationException("Limit operator must be preceded by an OrderBy operator."); + } + if (offset < 0) { + throw new ValidationException("Offset should be greater than or equal to zero."); + } + + return new Limit(offset, fetch, (LogicalNode) child); + } + + private void failIfStreaming() { + if (isStreaming) { + throw new ValidationException("Limit on stream tables is currently not supported."); Review comment: nit: rename exception to "A limit operation on unbounded tables is currently not supported." we should be consistent with the concepts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275273674 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.Limit; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.Sort; + +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Collections.singletonList; +import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDERING; +import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC; + +/** + * Utility class for creating a valid {@link Sort} operation. + */ +@Internal +public class SortOperationFactory { + + private final boolean isStreaming; + private final ExpressionBridge expressionBridge; + private final OrderWrapper orderWrapper = new OrderWrapper(); + + public SortOperationFactory( + ExpressionBridge expressionBridge, + boolean isStreaming) { + this.expressionBridge = expressionBridge; + this.isStreaming = isStreaming; + } + + /** +* Creates a valid {@link Sort} operation. +* +* NOTE: if the collation is not explicitly specified for any expression, it is wrapped in a +* default ascending order +* +* @param orders expressions describing order, +* @param child relational expression on top of which to apply the sort operation +* @return valid sort operation +*/ + public Sort createSort(List orders, TableOperation child) { + failIfStreaming(); + + List convertedOrders = orders.stream() + .map(f -> f.accept(orderWrapper)) + .map(expressionBridge::bridge) + .collect(Collectors.toList()); + return new Sort(convertedOrders, (LogicalNode) child); + } + + /** +* Creates a valid {@link Limit} operation. +* +* @param offset offset to start from +* @param fetch number of records to fetch +* @return valid limit operation +*/ + public Limit createLimit(int offset, int fetch, TableOperation child) { + failIfStreaming(); + + if (!(child instanceof Sort)) { + throw new ValidationException("Limit operator must be preceded by an OrderBy operator."); Review comment: nit: rename exception to "A limit operation must be preceded by a sort operation."? The case class names are not valid anymore and were misleading anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275270486 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Utility class for creating valid alias expressions that can be later used as a projection. + */ +@Internal +public final class AliasOperationUtils { + + private static final AliasLiteralValidator aliasLiteralValidator = new AliasLiteralValidator(); + + /** +* Creates a list of valid alias expressions. Resulting expression might still contain +* {@link UnresolvedReferenceExpression}. +* +* @param aliases aliases to validate +* @param child relational operation on top of which to apply the aliases +* @return validated list of aliases +*/ + public static List createAliasList(List aliases, TableOperation child) { + TableSchema childSchema = child.getTableSchema(); + + if (aliases.size() > childSchema.getFieldCount()) { + throw new ValidationException("Aliasing more fields than we actually have"); + } + + List fieldAliases = aliases.stream() + .map(f -> f.accept(aliasLiteralValidator)) + .collect(Collectors.toList()); + + String[] childNames = childSchema.getFieldNames(); + return IntStream.range(0, childNames.length) + .mapToObj(idx -> { + UnresolvedReferenceExpression oldField = new UnresolvedReferenceExpression(childNames[idx]); + if (idx < fieldAliases.size()) { + ValueLiteralExpression alias = fieldAliases.get(idx); + return new CallExpression(BuiltInFunctionDefinitions.AS, Arrays.asList(oldField, alias)); + } else { + return oldField; + } + }).collect(Collectors.toList()); + } + + private static class AliasLiteralValidator extends ApiExpressionDefaultVisitor { + + @Override + public ValueLiteralExpression visitValueLiteral(ValueLiteralExpression valueLiteralExpression) { + String name = ApiExpressionUtils.extractValue(valueLiteralExpression, Types.STRING()) + .orElseThrow(() -> new ValidationException( + "Alias only accept name expressions as arguments that is not '*'")); + + if (name.equals("*")) { Review comment: extract '*' into a constant? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275271583 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Utility class for creating valid alias expressions that can be later used as a projection. + */ +@Internal +public final class AliasOperationUtils { + + private static final AliasLiteralValidator aliasLiteralValidator = new AliasLiteralValidator(); + + /** +* Creates a list of valid alias expressions. Resulting expression might still contain +* {@link UnresolvedReferenceExpression}. +* +* @param aliases aliases to validate +* @param child relational operation on top of which to apply the aliases +* @return validated list of aliases +*/ + public static List createAliasList(List aliases, TableOperation child) { + TableSchema childSchema = child.getTableSchema(); + + if (aliases.size() > childSchema.getFieldCount()) { + throw new ValidationException("Aliasing more fields than we actually have"); + } + + List fieldAliases = aliases.stream() + .map(f -> f.accept(aliasLiteralValidator)) + .collect(Collectors.toList()); + + String[] childNames = childSchema.getFieldNames(); + return IntStream.range(0, childNames.length) + .mapToObj(idx -> { + UnresolvedReferenceExpression oldField = new UnresolvedReferenceExpression(childNames[idx]); + if (idx < fieldAliases.size()) { + ValueLiteralExpression alias = fieldAliases.get(idx); + return new CallExpression(BuiltInFunctionDefinitions.AS, Arrays.asList(oldField, alias)); + } else { + return oldField; + } + }).collect(Collectors.toList()); + } + + private static class AliasLiteralValidator extends ApiExpressionDefaultVisitor { + + @Override + public ValueLiteralExpression visitValueLiteral(ValueLiteralExpression valueLiteralExpression) { + String name = ApiExpressionUtils.extractValue(valueLiteralExpression, Types.STRING()) + .orElseThrow(() -> new ValidationException( + "Alias only accept name expressions as arguments that is not '*'")); Review comment: Could you update the exception messages? Also add a dot at the end and fix grammar mistakes. What are `name expressions`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273130779 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala ## @@ -71,15 +98,22 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp val extraNames = args .drop(2) .map(e => getValue[String](e)) -Alias(args.head, name, extraNames) +if (extraNames.nonEmpty) { + TableAlias(args.head, name, extraNames) Review comment: I agree with Hequn. Maybe we can avoid this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273114839 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/OperationTreeBuilder.scala ## @@ -37,6 +43,25 @@ import _root_.scala.collection.JavaConverters._ class OperationTreeBuilder(private val tableEnv: TableEnvironment) { private val expressionBridge: ExpressionBridge[PlannerExpression] = tableEnv.expressionBridge + private val columnOperationsFactory = new ColumnOperationsFactory Review comment: This is not really a factory and can just be a util class with static methods right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273101387 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/WrapInOrderRule.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions.rules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FunctionDefinition; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Collections.singletonList; +import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC; +import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_DESC; + +/** + * Makes sure that all expressions are wrapped in ordering expression. If the expression is either + * {@link org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_ASC} or + * {@link org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_DESC} it does nothing, otherwise + * it inserts {@link org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_ASC}. + */ +@Internal +final class WrapInOrderRule implements ResolverRule { Review comment: This logic belongs more to the tree builder than expression resolver. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273049948 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.GroupWindow; +import org.apache.flink.table.api.OverWindow; +import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias; +import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; +import org.apache.flink.table.expressions.lookups.OperationFieldLookup; +import org.apache.flink.table.expressions.lookups.TableReferenceLookup; +import org.apache.flink.table.expressions.rules.ResolverRule; +import org.apache.flink.table.expressions.rules.ResolverRules; +import org.apache.flink.table.operations.TableOperation; +import org.apache.flink.table.plan.logical.LogicalOverWindow; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SessionGroupWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Some; + +import static java.util.stream.Collectors.toList; + +/** + * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression} + * or calls such as {@link BuiltInFunctionDefinitions#OVER}. + * + * The default set of rules ({@link ExpressionResolverBuilder#getDefaultRules()}) will resolve following references: + * + * flatten '*' to all fields of underlying inputs + * join over aggregates with corresponding over windows into a single resolved call + * resolve remaining unresolved references to fields, tables or local references + * replace call to {@link BuiltInFunctionDefinitions#FLATTEN} + * performs call arguments types validation and inserts additional casts if possible + * + */ +public class ExpressionResolver { + + private final PlannerExpressionConverter bridgeConverter = PlannerExpressionConverter.INSTANCE(); + + private final FieldReferenceLookup fieldLookup; + + private final TableReferenceLookup tableLookup; + + //TODO change to LocalReferenceLookup, once we don't need to resolve fields to create LocalReferenceExpression + private final Map localReferences = new HashMap<>(); + + private final Map overWindows; + + private final Function, List> resolveFunction; + + private ExpressionResolver( + TableReferenceLookup tableLookup, + FieldReferenceLookup fieldLookup, + List overWindows, + @Nullable GroupWindow groupWindow, + List rules) { + this.tableLookup = Preconditions.checkNotNull(tableLookup); + this.fieldLookup = Preconditions.checkNotNull(fieldLookup); + this.resolveFunction = concatenateRules(rules); + + this.overWindows = prepareOverWindows(overWindows); + prepareLocalReferencesFromGroupWindows(groupWindow); + } + + /** +* Creates a builder for {@link ExpressionResolver}. One can add additional properties to the resolver +* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273100967 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/StarReferenceFlatteningRule.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions.rules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Collections.singletonList; + +/** + * Replaces '*' with all available {@link org.apache.flink.table.expressions.FieldReferenceExpression}s from underlying + * inputs. + */ +@Internal +final class StarReferenceFlatteningRule implements ResolverRule { + + @Override + public List apply(List expression, ResolutionContext context) { + return expression.stream() + .flatMap(expr -> expr.accept(new FieldFlatteningVisitor(context)).stream()) + .collect(Collectors.toList()); + } + + private static class FieldFlatteningVisitor extends ApiExpressionDefaultVisitor> { + + private final ResolutionContext resolutionContext; Review comment: Maybe introduce a helper class to reduce a bit of code duplication for every rule visitor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273117051 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala ## @@ -18,22 +18,25 @@ package org.apache.flink.table.plan.logical +import java.util.Optional +import java.util.{List => JList} + import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment} import org.apache.flink.table.expressions.PlannerExpressionUtils.{isRowCountLiteral, isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral} import org.apache.flink.table.expressions._ -import org.apache.flink.table.typeutils.TypeCheckUtils.{isTimePoint, isLong} +import org.apache.flink.table.typeutils.TypeCheckUtils.{isLong, isTimePoint} import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} // // Over windows // case class LogicalOverWindow( Review comment: Make this file a POJO in `o.a.f.t.operations` in a separate commit? We need some representation of windows as output of the API for the planner. Must not necessarily containing expressions only. Or only resolved expressions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273119314 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala ## @@ -47,67 +47,60 @@ class CalcValidationTest extends TableTestBase { .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window reference } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testAddColumnsWithAgg(): Unit = { val util = streamTestUtil() val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) tab.addColumns('a.sum) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testAddOrReplaceColumnsWithAgg(): Unit = { val util = streamTestUtil() val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) tab.addOrReplaceColumns('a.sum) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testRenameColumnsWithAgg(): Unit = { val util = streamTestUtil() val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) tab.renameColumns('a.sum) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testRenameColumnsWithoutAlias(): Unit = { val util = streamTestUtil() val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) tab.renameColumns('a) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testRenameColumnsWithFunctallCall(): Unit = { val util = streamTestUtil() val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) tab.renameColumns('a + 1 as 'a2) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testRenameColumnsNotExist(): Unit = { val util = streamTestUtil() val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) tab.renameColumns('e as 'e2) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testDropColumnsWithAgg(): Unit = { val util = streamTestUtil() val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) tab.dropColumns('a.sum) } - @Test(expected = classOf[TableException]) Review comment: Isn't this test still valid? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273052867 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/lookups/FieldReferenceLookup.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions.lookups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.FieldReferenceExpression; + +import java.util.List; +import java.util.Optional; + +/** + * Provides a way to look up field reference by the name of the field. + */ +@Internal +public interface FieldReferenceLookup { Review comment: There is just one implementation. I would prefer merging this internal interface and class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273052519 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.GroupWindow; +import org.apache.flink.table.api.OverWindow; +import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias; +import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; +import org.apache.flink.table.expressions.lookups.OperationFieldLookup; +import org.apache.flink.table.expressions.lookups.TableReferenceLookup; +import org.apache.flink.table.expressions.rules.ResolverRule; +import org.apache.flink.table.expressions.rules.ResolverRules; +import org.apache.flink.table.operations.TableOperation; +import org.apache.flink.table.plan.logical.LogicalOverWindow; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SessionGroupWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Some; + +import static java.util.stream.Collectors.toList; + +/** + * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression} + * or calls such as {@link BuiltInFunctionDefinitions#OVER}. + * + * The default set of rules ({@link ExpressionResolverBuilder#getDefaultRules()}) will resolve following references: + * + * flatten '*' to all fields of underlying inputs + * join over aggregates with corresponding over windows into a single resolved call + * resolve remaining unresolved references to fields, tables or local references + * replace call to {@link BuiltInFunctionDefinitions#FLATTEN} + * performs call arguments types validation and inserts additional casts if possible + * + */ +public class ExpressionResolver { + + private final PlannerExpressionConverter bridgeConverter = PlannerExpressionConverter.INSTANCE(); + + private final FieldReferenceLookup fieldLookup; + + private final TableReferenceLookup tableLookup; + + //TODO change to LocalReferenceLookup, once we don't need to resolve fields to create LocalReferenceExpression + private final Map localReferences = new HashMap<>(); + + private final Map overWindows; + + private final Function, List> resolveFunction; + + private ExpressionResolver( + TableReferenceLookup tableLookup, + FieldReferenceLookup fieldLookup, + List overWindows, + @Nullable GroupWindow groupWindow, + List rules) { + this.tableLookup = Preconditions.checkNotNull(tableLookup); + this.fieldLookup = Preconditions.checkNotNull(fieldLookup); + this.resolveFunction = concatenateRules(rules); + + this.overWindows = prepareOverWindows(overWindows); + prepareLocalReferencesFromGroupWindows(groupWindow); + } + + /** +* Creates a builder for {@link ExpressionResolver}. One can add additional properties to the resolver +* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273108596 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/OperationTreeBuilder.scala ## @@ -18,16 +18,22 @@ package org.apache.flink.table.plan Review comment: nit: move to `o.a.f.t.operations` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273117168 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ## @@ -41,36 +41,17 @@ import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.tools.nsc.interpreter.JList Review comment: wrong import This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273049675 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.GroupWindow; +import org.apache.flink.table.api.OverWindow; +import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias; +import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; +import org.apache.flink.table.expressions.lookups.OperationFieldLookup; +import org.apache.flink.table.expressions.lookups.TableReferenceLookup; +import org.apache.flink.table.expressions.rules.ResolverRule; +import org.apache.flink.table.expressions.rules.ResolverRules; +import org.apache.flink.table.operations.TableOperation; +import org.apache.flink.table.plan.logical.LogicalOverWindow; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SessionGroupWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Some; + +import static java.util.stream.Collectors.toList; + +/** + * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression} + * or calls such as {@link BuiltInFunctionDefinitions#OVER}. + * + * The default set of rules ({@link ExpressionResolverBuilder#getDefaultRules()}) will resolve following references: + * + * flatten '*' to all fields of underlying inputs + * join over aggregates with corresponding over windows into a single resolved call + * resolve remaining unresolved references to fields, tables or local references + * replace call to {@link BuiltInFunctionDefinitions#FLATTEN} + * performs call arguments types validation and inserts additional casts if possible + * + */ +public class ExpressionResolver { + + private final PlannerExpressionConverter bridgeConverter = PlannerExpressionConverter.INSTANCE(); + + private final FieldReferenceLookup fieldLookup; + + private final TableReferenceLookup tableLookup; + + //TODO change to LocalReferenceLookup, once we don't need to resolve fields to create LocalReferenceExpression + private final Map localReferences = new HashMap<>(); + + private final Map overWindows; + + private final Function, List> resolveFunction; + + private ExpressionResolver( + TableReferenceLookup tableLookup, + FieldReferenceLookup fieldLookup, + List overWindows, + @Nullable GroupWindow groupWindow, + List rules) { + this.tableLookup = Preconditions.checkNotNull(tableLookup); + this.fieldLookup = Preconditions.checkNotNull(fieldLookup); + this.resolveFunction = concatenateRules(rules); + + this.overWindows = prepareOverWindows(overWindows); + prepareLocalReferencesFromGroupWindows(groupWindow); + } + + /** +* Creates a builder for {@link ExpressionResolver}. One can add additional properties to the resolver +* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273102653 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions.rules; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; + +/** + * Contains instances of {@link ResolverRule}. + */ +@PublicEvolving +public final class ResolverRules { + + /** +* Rule that resolves flatten call. See {@link FlattenCallRule} for details. +*/ + public static final ResolverRule FLATTEN_CALL = new FlattenCallRule(); + + /** +* Resolves {@link UnresolvedReferenceExpression}. See {@link ReferenceResolverRule} for details. +*/ + public static final ResolverRule FIELD_RESOLVE = new ReferenceResolverRule(); + + /** +* Resolves call based on argument types. See {@link ResolveCallByArgumentsRule} for details. +*/ + public static final ResolverRule RESOLVE_CALL = new ResolveCallByArgumentsRule(); + + /** +* Concatenates over aggregations with corresponding over window. See {@link OverWindowResolverRule} for details. +*/ + public static final ResolverRule OVER_WINDOWS = new OverWindowResolverRule(); + + /** +* Resolves '*' expressions to corresponding fields of inputs. See {@link StarReferenceFlatteningRule} for details. +*/ + public static final ResolverRule FLATTEN_STAR_REFERENCE = new StarReferenceFlatteningRule(); + + /* + NON DEFAULT RULES +*/ + + /** +* Used in sort operation. It assures expression is wrapped in ordering expression. See {@link WrapInOrderRule} +* for details. +*/ + public static final ResolverRule WRAP_IN_ORDER = new WrapInOrderRule(); + + /** +* Used in projection operation. It derives name for expression. See {@link NameExpressionRule} for details. +*/ + public static final ResolverRule NAME_EXPRESSION = new NameExpressionRule(); Review comment: This rule is more projection related and should be put into the projection builder. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273070489 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions.rules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.CurrentRange; +import org.apache.flink.table.expressions.CurrentRow; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.LogicalOverWindow; +import org.apache.flink.table.typeutils.RowIntervalTypeInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toList; + +/** + * Joins call to {@link BuiltInFunctionDefinitions#OVER} with corresponding over window + * and creates a fully resolved over aggregation. + */ +@Internal +final class OverWindowResolverRule implements ResolverRule { + + @Override + public List apply(List expression, ResolutionContext context) { + return expression.stream() + .map(expr -> expr.accept(new ExpressionResolverVisitor(context))) + .collect(Collectors.toList()); + } + + private class ExpressionResolverVisitor extends ApiExpressionDefaultVisitor { + + private final ResolutionContext context; + + ExpressionResolverVisitor(ResolutionContext context) { + this.context = context; + } + + @Override + public Expression visitCall(CallExpression call) { + + if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.OVER) { + List children = call.getChildren(); + Expression alias = children.get(1); + + LogicalOverWindow referenceWindow = context.getOverWindow(alias) + .orElseThrow(() -> new ValidationException("Could not resolve over call.")); + + Expression following = calculateOverWindowFollowing(referenceWindow); + List newArgs = new ArrayList<>(asList( + children.get(0), + referenceWindow.orderBy(), + referenceWindow.preceding(), + following)); + + newArgs.addAll(referenceWindow.partitionBy()); + return new CallExpression(call.getFunctionDefinition(), newArgs); + } else { + return new CallExpression( + call.getFunctionDefinition(), + call.getChildren().stream().map(expr -> expr.accept(this)).collect(toList())); + } + } + + private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) { + return referenceWindow.following().orElseGet(() -> { + PlannerExpression preceding = context.bridge(referenceWindow.preceding()); + if (preceding.resultType() instanceof RowIntervalTypeInfo) { + return new CurrentRow(); Review comment: Change to a call instead of planner expressions. This is a
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273106216 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -42,8 +42,11 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl import org.apache.flink.util.InstantiationUtil +import java.util.{Optional, List => JList} Review comment: Unrelated changes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273052322 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.GroupWindow; +import org.apache.flink.table.api.OverWindow; +import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias; +import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; +import org.apache.flink.table.expressions.lookups.OperationFieldLookup; +import org.apache.flink.table.expressions.lookups.TableReferenceLookup; +import org.apache.flink.table.expressions.rules.ResolverRule; +import org.apache.flink.table.expressions.rules.ResolverRules; +import org.apache.flink.table.operations.TableOperation; +import org.apache.flink.table.plan.logical.LogicalOverWindow; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SessionGroupWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Some; + +import static java.util.stream.Collectors.toList; + +/** + * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression} + * or calls such as {@link BuiltInFunctionDefinitions#OVER}. + * + * The default set of rules ({@link ExpressionResolverBuilder#getDefaultRules()}) will resolve following references: + * + * flatten '*' to all fields of underlying inputs + * join over aggregates with corresponding over windows into a single resolved call + * resolve remaining unresolved references to fields, tables or local references + * replace call to {@link BuiltInFunctionDefinitions#FLATTEN} + * performs call arguments types validation and inserts additional casts if possible + * + */ +public class ExpressionResolver { + + private final PlannerExpressionConverter bridgeConverter = PlannerExpressionConverter.INSTANCE(); + + private final FieldReferenceLookup fieldLookup; + + private final TableReferenceLookup tableLookup; + + //TODO change to LocalReferenceLookup, once we don't need to resolve fields to create LocalReferenceExpression + private final Map localReferences = new HashMap<>(); + + private final Map overWindows; + + private final Function, List> resolveFunction; + + private ExpressionResolver( + TableReferenceLookup tableLookup, + FieldReferenceLookup fieldLookup, + List overWindows, + @Nullable GroupWindow groupWindow, + List rules) { + this.tableLookup = Preconditions.checkNotNull(tableLookup); + this.fieldLookup = Preconditions.checkNotNull(fieldLookup); + this.resolveFunction = concatenateRules(rules); + + this.overWindows = prepareOverWindows(overWindows); + prepareLocalReferencesFromGroupWindows(groupWindow); + } + + /** +* Creates a builder for {@link ExpressionResolver}. One can add additional properties to the resolver +* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273120072 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala ## @@ -148,6 +148,6 @@ class OverWindowValidationTest extends TableTestBase { table .window(Over orderBy 'rowtime preceding 1.minutes as 'w) -.select('c, 'a.count over 'w, 'w.start, 'w.end) +.select('c, 'a.count over 'w, 'w.start + 1, 'w.end + 2) Review comment: Unrelated change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273101909 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions.rules; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; + +/** + * Contains instances of {@link ResolverRule}. + */ +@PublicEvolving Review comment: `@Internal` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273017053 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.GroupWindow; +import org.apache.flink.table.api.OverWindow; +import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias; +import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; +import org.apache.flink.table.expressions.lookups.OperationFieldLookup; +import org.apache.flink.table.expressions.lookups.TableReferenceLookup; +import org.apache.flink.table.expressions.rules.ResolverRule; +import org.apache.flink.table.expressions.rules.ResolverRules; +import org.apache.flink.table.operations.TableOperation; +import org.apache.flink.table.plan.logical.LogicalOverWindow; +import org.apache.flink.table.plan.logical.LogicalWindow; +import org.apache.flink.table.plan.logical.SessionGroupWindow; +import org.apache.flink.table.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.plan.logical.TumblingGroupWindow; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Some; + +import static java.util.stream.Collectors.toList; + +/** + * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression} + * or calls such as {@link BuiltInFunctionDefinitions#OVER}. + * + * The default set of rules ({@link ExpressionResolverBuilder#getDefaultRules()}) will resolve following references: + * + * flatten '*' to all fields of underlying inputs + * join over aggregates with corresponding over windows into a single resolved call + * resolve remaining unresolved references to fields, tables or local references + * replace call to {@link BuiltInFunctionDefinitions#FLATTEN} + * performs call arguments types validation and inserts additional casts if possible + * + */ +public class ExpressionResolver { Review comment: `@Internal` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273033445 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java ## @@ -78,12 +83,12 @@ public static SymbolExpression symbol(TableSymbol symbol) { return new SymbolExpression(symbol); } - public static UnresolvedFieldReferenceExpression unresolvedFieldRef(String name) { - return new UnresolvedFieldReferenceExpression(name); + public static UnresolvedReferenceExpression unresolvedFieldRef(String name) { + return new UnresolvedReferenceExpression(name); } public static TableReferenceExpression tableRef(String name, Table table) { Review comment: Yes, maybe we can make this nicer once we rework table environments and how the interplay with catalogs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273025008 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/lookups/OperationFieldLookup.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions.lookups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.operations.TableOperation; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; + +/** + * Implementation of {@link FieldReferenceLookup} that gives access to fields of given {@link TableOperation}s. + */ +@Internal +public class OperationFieldLookup implements FieldReferenceLookup { Review comment: Merge interface and class into one? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r273004862 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java ## @@ -83,6 +84,11 @@ */ void printSchema(); + /** +* Returns underlying logical representation of this table. +*/ + TableOperation getTableOperation(); Review comment: This method is definitely worth discussing because it fits better to Impl. On the otherhand this will be the output of the API and we don't need a lot of casting at different positions. We will mark this method as internal and can still remove it from the interface in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r272965435 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala ## @@ -232,7 +232,7 @@ object PlannerExpressionParserImpl extends JavaTokenParsers lazy val literalExpr: PackratParser[Expression] = numberLiteral | doubleQuoteStringLiteral | singleQuoteStringLiteral | boolLiteral - lazy val fieldReference: PackratParser[UnresolvedFieldReferenceExpression] = (STAR | ident) ^^ { + lazy val fieldReference: PackratParser[UnresolvedReferenceExpression] = (STAR | ident) ^^ { sym => unresolvedFieldRef(sym) Review comment: Please also update the API util class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions
twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r272964598 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java ## @@ -78,12 +83,12 @@ public static SymbolExpression symbol(TableSymbol symbol) { return new SymbolExpression(symbol); } - public static UnresolvedFieldReferenceExpression unresolvedFieldRef(String name) { - return new UnresolvedFieldReferenceExpression(name); + public static UnresolvedReferenceExpression unresolvedFieldRef(String name) { + return new UnresolvedReferenceExpression(name); } public static TableReferenceExpression tableRef(String name, Table table) { Review comment: True, registering the table under the hood is not perfect design. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services