[ https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828010#comment-15828010 ]
ASF GitHub Bot commented on FLINK-4693: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96629837 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -226,27 +225,102 @@ object AggregateUtil { aggregates, groupingOffsetMapping, aggOffsetMapping, - intermediateRowArity, + intermediateRowArity + 1,// the addition one field is used to store time attribute outputType.getFieldCount) + + case EventTimeSessionGroupWindow(_, _, gap) => + val (startPos, endPos) = if (isTimeWindow(window)) { + computeWindowStartEndPropertyPos(properties) + } else { + (None, None) + } + new DataSetSessionWindowAggregateReduceGroupFunction( + aggregates, + groupingOffsetMapping, + aggOffsetMapping, + // the addition two fields are used to store window-start and window-end attributes + intermediateRowArity + 2, + outputType.getFieldCount, + startPos, + endPos, + asLong(gap)) case _ => throw new UnsupportedOperationException(s"$window is currently not supported on batch") } } /** + * Create a [[org.apache.flink.api.common.functions.GroupCombineFunction]] that pre-aggregation + * for aggregates. + * The function returns intermediate aggregate values of all aggregate function which are + * organized by the following format: + * + * {{{ + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(row-time) + * | | | + * v v v + * +---------+---------+--------+--------+--------+--------+-----------+---------+ + * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +---------+---------+--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(row-time)) + * + * }}} + * + */ + --- End diff -- Remove empty line. > Add session group-windows for batch tables > ------------------------------------------- > > Key: FLINK-4693 > URL: https://issues.apache.org/jira/browse/FLINK-4693 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Timo Walther > Assignee: sunjincheng > > Add Session group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)