[ 
https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676708#comment-15676708
 ] 

Jark Wu commented on FLINK-4679:
--------------------------------

Hi [~fhueske] [~twalthr], if I understand correctly, the row-window will 
evaluate the aggregates every time a row comes in the window. I think it is 
really like window early-fire which is controlled by Trigger. Could we 
implement some specific Trigger to fire on every element and then no custom 
stream operator needed ?  Have I missed anything?

The Row-count row-window trigger could be like this : 

{code}
class RowWindowCountTrigger[W <: Window](maxCount: Long) extends Trigger[Any, 
W] {

  val stateDesc = new ReducingStateDescriptor[JLong]("count", Sum, 
LongSerializer.INSTANCE)

  override def onElement(element: Any, timestamp: Long, window: W, ctx: 
TriggerContext)
  : TriggerResult = {

    val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc)
    count.add(1L)
    if (count.get >= maxCount) {
      count.clear()
      TriggerResult.FIRE_AND_PURGE
    } else {
      TriggerResult.FIRE
    }

  }

  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): 
TriggerResult =
    TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: W, ctx: TriggerContext): 
TriggerResult =
    TriggerResult.CONTINUE

  override def clear(window: W, ctx: TriggerContext): Unit =
    ctx.getPartitionedState(stateDesc).clear()
  

  @SerialVersionUID(1L)
  object Sum extends ReduceFunction[JLong] {
    @throws[Exception]
    def reduce(value1: JLong, value2: JLong): JLong = value1 + value2
  }
}
{code}

> Add TumbleRow row-windows for streaming tables
> ----------------------------------------------
>
>                 Key: FLINK-4679
>                 URL: https://issues.apache.org/jira/browse/FLINK-4679
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Jark Wu
>
> Add TumbleRow row-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> This task requires to implement a custom stream operator and integrate it 
> with checkpointing and timestamp / watermark logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to