[
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278198#comment-15278198
]
ASF GitHub Bot commented on FLINK-3754:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1958#discussion_r62683921
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment,
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode)
extends UnaryNode {
+ override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+ override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode
= {
+ val afterResolve =
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+ val newProjectList =
+ afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+ e match {
+ case u @ UnresolvedAlias(child, optionalAliasName) => child
match {
+ case ne: NamedExpression => ne
+ case e if !e.valid => u
+ case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+ case other => Alias(other,
optionalAliasName.getOrElse(s"_c$i"))
+ }
+ case _ => throw new IllegalArgumentException
+ }
+ }
+ Project(newProjectList, child)
+ }
+
+ override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+ val allAlias = projectList.forall(_.isInstanceOf[Alias])
+ child.toRelNode(relBuilder)
+ if (allAlias) {
+ // Calcite's RelBuilder does not translate identity projects even if
they rename fields.
+ // Add a projection ourselves (will be automatically removed by
translation rules).
+ relBuilder.push(
+ LogicalProject.create(relBuilder.peek(),
+ projectList.map(_.toRexNode(relBuilder)).asJava,
+ projectList.map(_.name).asJava))
+ } else {
+ relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+ }
+ }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode)
extends UnaryNode {
+ override def output: Seq[Attribute] =
+ throw new UnresolvedException("Invalid call to output on AliasNode")
+
+ override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+ throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+ override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode
= {
+ if (aliasList.length > child.output.length) {
+ failValidation("Aliasing more fields than we actually have")
+ } else if
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+ failValidation("`as` only allow string arguments")
+ } else {
+ val names =
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+ val input = child.output
+ Project(
+ names.zip(input).map { case (name, attr) =>
+ Alias(attr, name)} ++ input.drop(names.length), child)
+ }
+ }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+
+ override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+ child.toRelNode(relBuilder)
+ relBuilder.distinct()
+ }
+
+ override def validate(tableEnv: TableEnvironment): LogicalNode = {
+ if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+ failValidation(s"Distinct on stream tables is currently not
supported.")
+ }
+ this
+ }
+}
+
+case class Sort(order: Seq[Ordering], child: LogicalNode) extends
UnaryNode {
--- End diff --
can you check for `Sort` that it is not used on streaming tables?
> Add a validation phase before construct RelNode using TableAPI
> --------------------------------------------------------------
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
> Issue Type: Improvement
> Components: Table API
> Affects Versions: 1.0.0
> Reporter: Yijie Shen
> Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before
> RelNode construction, Table API lacks the counterparts and the validation is
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as
> possible.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)