Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109856293
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
 ---
    @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
       }
     
       private def identifyWindow(field: RexNode): Option[Window] = {
    -    // Detects window expressions by pattern matching
    -    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
    -    //   with time being equal to proctime() or rowtime()
         field match {
           case call: RexCall =>
             call.getOperator match {
    -          case _: SqlFloorFunction =>
    -            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
    -            val unit: TimeUnitRange = 
operand.getValue.asInstanceOf[TimeUnitRange]
    -            val w = 
LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
    -            call.getType match {
    -              case TimeModeTypes.PROCTIME =>
    -                return Some(w)
    -              case TimeModeTypes.ROWTIME =>
    -                return Some(w.on("rowtime"))
    -              case _ =>
    -            }
    -          case _ =>
    +          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.TUMBLE => 
TumbleWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.HOP => 
SlidingWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.SESSION => 
SessionWindowTranslator(call).toWindow
    +          case _ => None
             }
    -      case _ =>
    +      case _ => None
         }
    -    None
       }
    -
     }
     
    -object LogicalWindowAggregateRule {
    +private abstract class WindowTranslator {
    +  val call: RexCall
     
    -  private[flink] val LOGICAL_WINDOW_PREDICATE = 
RelOptRule.operand(classOf[LogicalAggregate],
    -    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
    +  protected def unwrapLiteral[T](node: RexNode): T =
    +    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
     
    -  private[flink] val INSTANCE = new LogicalWindowAggregateRule
    +  protected def getOperandAsLong(idx: Int): Long =
    +    unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
    --- End diff --
    
    Flink does only support windows with fixed configuration (SESSION windows 
have variable length, but the gap parameter is fixed). I'm also not sure if 
that would make sense. It's quite hard to reason about the behavior of a window 
with variable parameters, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to