[ 
https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954915#comment-15954915
 ] 

ASF GitHub Bot commented on FLINK-6011:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109616209
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
 ---
    @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
       }
     
       private def identifyWindow(field: RexNode): Option[Window] = {
    -    // Detects window expressions by pattern matching
    -    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
    -    //   with time being equal to proctime() or rowtime()
         field match {
           case call: RexCall =>
             call.getOperator match {
    -          case _: SqlFloorFunction =>
    -            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
    -            val unit: TimeUnitRange = 
operand.getValue.asInstanceOf[TimeUnitRange]
    -            val w = 
LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
    -            call.getType match {
    -              case TimeModeTypes.PROCTIME =>
    -                return Some(w)
    -              case TimeModeTypes.ROWTIME =>
    -                return Some(w.on("rowtime"))
    -              case _ =>
    -            }
    -          case _ =>
    +          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.TUMBLE => 
TumbleWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.HOP => 
SlidingWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.SESSION => 
SessionWindowTranslator(call).toWindow
    +          case _ => None
             }
    -      case _ =>
    +      case _ => None
         }
    -    None
       }
    -
     }
     
    -object LogicalWindowAggregateRule {
    +private abstract class WindowTranslator {
    +  val call: RexCall
     
    -  private[flink] val LOGICAL_WINDOW_PREDICATE = 
RelOptRule.operand(classOf[LogicalAggregate],
    -    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
    +  protected def unwrapLiteral[T](node: RexNode): T =
    +    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
     
    -  private[flink] val INSTANCE = new LogicalWindowAggregateRule
    +  protected def getOperandAsLong(idx: Int): Long =
    +    unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
    --- End diff --
    
    Does Calcite ensure that operands can only be literals, no input reference?


> Support TUMBLE, HOP, SESSION window in streaming SQL
> ----------------------------------------------------
>
>                 Key: FLINK-6011
>                 URL: https://issues.apache.org/jira/browse/FLINK-6011
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / 
> {{HOP}} / {{SESSION}} windows in the parser.
> This jira tracks the efforts of adding the corresponding supports on the 
> planners / optimizers in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to