[ 
https://issues.apache.org/jira/browse/FLINK-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16036782#comment-16036782
 ] 

ASF GitHub Bot commented on FLINK-6834:
---------------------------------------

Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4070#discussion_r120081629
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
    @@ -226,30 +226,69 @@ object ProjectionTranslator {
           overWindows: Array[OverWindow],
           tEnv: TableEnvironment): Seq[Expression] = {
     
    -    def resolveOverWindow(unresolvedCall: UnresolvedOverCall): Expression 
= {
    -
    -      val overWindow = 
overWindows.find(_.alias.equals(unresolvedCall.alias))
    -      if (overWindow.isDefined) {
    -        OverCall(
    -          unresolvedCall.agg,
    -          overWindow.get.partitionBy,
    -          overWindow.get.orderBy,
    -          overWindow.get.preceding,
    -          overWindow.get.following)
    -      } else {
    -        unresolvedCall
    -      }
    -    }
    +    exprs.map(e => replaceOverCall(e, overWindows, tEnv))
    +  }
     
    -    val projectList = new ListBuffer[Expression]
    -    exprs.foreach {
    -      case Alias(u: UnresolvedOverCall, name, _) =>
    -        projectList += Alias(resolveOverWindow(u), name)
    +  /**
    +    * Find and replace UnresolvedOverCall with OverCall
    +    *
    +    * @param expr    the expression to check
    +    * @return an expression with correct resolved OverCall
    +    */
    +  private def replaceOverCall(
    +    expr: Expression,
    +    overWindows: Array[OverWindow],
    +    tableEnv: TableEnvironment): Expression = {
    +
    +    expr match {
           case u: UnresolvedOverCall =>
    -        projectList += resolveOverWindow(u)
    -      case e: Expression => projectList += e
    +        val overWindow = overWindows.find(_.alias.equals(u.alias))
    +        if (overWindow.isDefined) {
    +          OverCall(
    +            u.agg,
    +            overWindow.get.partitionBy,
    +            overWindow.get.orderBy,
    +            overWindow.get.preceding,
    +            overWindow.get.following)
    +        } else {
    +          u
    +        }
    +
    +      case u: UnaryExpression =>
    +        val c = replaceOverCall(u.child, overWindows, tableEnv)
    +        u.makeCopy(Array(c))
    +
    +      case b: BinaryExpression =>
    +        val l = replaceOverCall(b.left, overWindows, tableEnv)
    +        val r = replaceOverCall(b.right, overWindows, tableEnv)
    +        b.makeCopy(Array(l, r))
    +
    +      // Functions calls
    +      case c @ Call(name, args: Seq[Expression]) =>
    +        val newArgs =
    +          args.map(
    +            (exp: Expression) =>
    +              replaceOverCall(exp, overWindows, tableEnv))
    +        c.makeCopy(Array(name, newArgs))
    +
    +      // Scala functions
    +      case sfc @ ScalarFunctionCall(clazz, args: Seq[Expression]) =>
    +        val newArgs: Seq[Expression] =
    +          args.map(
    +            (exp: Expression) =>
    +              replaceOverCall(exp, overWindows, tableEnv))
    +        sfc.makeCopy(Array(clazz, newArgs))
    +
    +      // Array constructor
    +      case c @ ArrayConstructor(args) =>
    --- End diff --
    
    Can you explain me that when we using  `ArrayConstructor` ?


> Over window doesn't support complex calculation
> -----------------------------------------------
>
>                 Key: FLINK-6834
>                 URL: https://issues.apache.org/jira/browse/FLINK-6834
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>             Fix For: 1.4.0
>
>
> The following example
> {code}
> val windowedTable = table
>       .window(
>         Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
>       .select('c, 'b, ('a.count over 'w) + 1)
> {code}
> will throw exception: 
> {code}
> org.apache.flink.table.api.ValidationException: Expression 
> UnresolvedOverCall(count('a),'w) failed on input check: Over window with 
> alias $alias could not be resolved.
>       at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
>       at 
> org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:89)
>       at 
> org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
>       at 
> org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
>       at 
> org.apache.flink.table.plan.TreeNode$$anonfun$1.apply(TreeNode.scala:46)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to