[ https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829606#comment-15829606 ]
ASF GitHub Bot commented on FLINK-4693: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96821333 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala --- @@ -218,6 +216,85 @@ class DataSetWindowAggregate( } } + private[this] def createEventTimeSessionWindowDataSet( + inputDS: DataSet[Any], + isParserCaseSensitive: Boolean): DataSet[Any] = { + + val groupingKeys = grouping.indices.toArray + val rowTypeInfo = resultRowTypeInfo + + // grouping window + if (groupingKeys.length > 0) { + //create mapFunction for initializing the aggregations + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType,isParserCaseSensitive) + + // create groupReduceFunction for calculating the aggregations + val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties) + + val mappedInput = + inputDS + .map(mapFunction) + .name(prepareOperatorName) + + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType + + // the position of the rowtime field in the intermediate result for map output + val rowTimeFilePos = mapReturnType.getArity - 1 --- End diff -- should be `rowTimeFieldPos`? > Add session group-windows for batch tables > ------------------------------------------- > > Key: FLINK-4693 > URL: https://issues.apache.org/jira/browse/FLINK-4693 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Timo Walther > Assignee: sunjincheng > > Add Session group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)