[ https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676708#comment-15676708 ]
Jark Wu commented on FLINK-4679: -------------------------------- Hi [~fhueske] [~twalthr], if I understand correctly, the row-window will evaluate the aggregates every time a row comes in the window. I think it is really like window early-fire which is controlled by Trigger. Could we implement some specific Trigger to fire on every element and then no custom stream operator needed ? Have I missed anything? The Row-count row-window trigger could be like this : {code} class RowWindowCountTrigger[W <: Window](maxCount: Long) extends Trigger[Any, W] { val stateDesc = new ReducingStateDescriptor[JLong]("count", Sum, LongSerializer.INSTANCE) override def onElement(element: Any, timestamp: Long, window: W, ctx: TriggerContext) : TriggerResult = { val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc) count.add(1L) if (count.get >= maxCount) { count.clear() TriggerResult.FIRE_AND_PURGE } else { TriggerResult.FIRE } } override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = TriggerResult.CONTINUE override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = TriggerResult.CONTINUE override def clear(window: W, ctx: TriggerContext): Unit = ctx.getPartitionedState(stateDesc).clear() @SerialVersionUID(1L) object Sum extends ReduceFunction[JLong] { @throws[Exception] def reduce(value1: JLong, value2: JLong): JLong = value1 + value2 } } {code} > Add TumbleRow row-windows for streaming tables > ---------------------------------------------- > > Key: FLINK-4679 > URL: https://issues.apache.org/jira/browse/FLINK-4679 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Affects Versions: 1.2.0 > Reporter: Fabian Hueske > Assignee: Jark Wu > > Add TumbleRow row-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > This task requires to implement a custom stream operator and integrate it > with checkpointing and timestamp / watermark logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)