[
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278188#comment-15278188
]
ASF GitHub Bot commented on FLINK-3754:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1958#discussion_r62683029
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+ * LogicalNode is created and validated as we construct query plan using
Table API.<p>
+ *
+ * The main validation procedure is separated into two phases:<p>
+ * Expressions' resolution and transformation
(#resolveExpressions(TableEnvironment)):
+ * <ul>
+ * <li>translate UnresolvedFieldReference into ResolvedFieldReference
+ * using child operator's output</li>
+ * <li>translate Call(UnresolvedFunction) into solid Expression</li>
+ * <li>generate alias names for query output</li>
+ * <li>....</li>
+ * </ul>
+ *
+ * LogicalNode validation (#validate(TableEnvironment)):
+ * <ul>
+ * <li>check no UnresolvedFieldReference exists any more</li>
+ * <li>check if all expressions have children of needed type</li>
+ * <li>check each logical operator have desired input</li>
+ * </ul>
+ * Once we pass the validation phase, we can safely convert LogicalNode
into Calcite's RelNode.
+ *
+ * Note: this is adapted from Apache Spark's LogicalPlan.
+ */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+ def output: Seq[Attribute]
+
+ def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+ // resolve references and function calls
+ transformExpressionsUp {
+ case u @ UnresolvedFieldReference(name) =>
+ resolveChildren(name).getOrElse(u)
+ case c @ Call(name, children) if c.childrenValid =>
+ tableEnv.getFunctionCatalog.lookupFunction(name, children)
+ }
+ }
+
+ def toRelNode(relBuilder: RelBuilder): RelBuilder
+
+ def validate(tableEnv: TableEnvironment): LogicalNode = {
+ val resolvedNode = resolveExpressions(tableEnv)
+ resolvedNode.transformExpressionsUp {
+ case a: Attribute if !a.valid =>
+ val from = children.flatMap(_.output).map(_.name).mkString(", ")
+ failValidation(s"cannot resolve [${a.name}] given input [$from]")
+
+ case e: Expression if e.validateInput().isFailure =>
+ e.validateInput() match {
+ case ExprValidationResult.ValidationFailure(message) =>
+ failValidation(s"Expression $e failed on input check:
$message")
+ }
+ }
+ }
+
+ /**
+ * Resolves the given strings to a [[NamedExpression]] using the input
from all child
+ * nodes of this LogicalPlan.
+ */
+ def resolveChildren(name: String): Option[NamedExpression] =
+ resolve(name, children.flatMap(_.output))
+
+ /**
+ * Performs attribute resolution given a name and a sequence of
possible attributes.
+ */
+ def resolve(name: String, input: Seq[Attribute]):
Option[NamedExpression] = {
+ // find all matches in input
+ val candidates = input.filter(_.name.equalsIgnoreCase(name))
+ if (candidates.length > 1) {
+ failValidation(s"Reference $name is ambiguous")
+ } else if (candidates.length == 0) {
+ None
+ } else {
+ Some(candidates.head.withName(name))
+ }
+ }
+
+ def expressions: Seq[Expression] = {
--- End diff --
I think this code is not used.
> Add a validation phase before construct RelNode using TableAPI
> --------------------------------------------------------------
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
> Issue Type: Improvement
> Components: Table API
> Affects Versions: 1.0.0
> Reporter: Yijie Shen
> Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before
> RelNode construction, Table API lacks the counterparts and the validation is
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as
> possible.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)