[ https://issues.apache.org/jira/browse/FLINK-6216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949938#comment-15949938 ]
ASF GitHub Bot commented on FLINK-6216: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3646#discussion_r109043405 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +/** + * Aggregate Function used for the groupby (without window) aggregate + * + * @param aggregates the list of all + * [[org.apache.flink.table.functions.AggregateFunction]] used for + * this aggregation + * @param aggFields the position (in the input Row) of the input value for each + * aggregate + * @param groupings the position (in the input Row) of the grouping keys + * @param aggregationStateType the row type info of aggregation + */ +class GroupAggProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val groupings: Array[Int], + private val aggregationStateType: RowTypeInfo) + extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: ValueState[Row] = _ + + override def open(config: Configuration) { + output = new Row(groupings.length + aggregates.length) + val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType) + state = getRuntimeContext.getState(stateDescriptor) + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + var i = 0 + + var accumulators = state.value() + + if (null == accumulators) { + accumulators = new Row(aggregates.length) + i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + } + } + + // Set group keys value to the final output + i = 0 + while (i < groupings.length) { + output.setField(i, input.getField(groupings(i))) + i += 1 + } + + // Set aggregate (early-firing) result to the final output --- End diff -- I would not call this early firing as the result will never be complete. > DataStream unbounded groupby aggregate with early firing > -------------------------------------------------------- > > Key: FLINK-6216 > URL: https://issues.apache.org/jira/browse/FLINK-6216 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > Groupby aggregate results in a replace table. For infinite groupby aggregate, > we need a mechanism to define when the data should be emitted (early-fired). > This task is aimed to implement the initial version of unbounded groupby > aggregate, where we update and emit aggregate value per each arrived record. > In the future, we will implement the mechanism and interface to let user > define the frequency/period of early-firing the unbounded groupby aggregation > results. > The limit space of backend state is one of major obstacles for supporting > unbounded groupby aggregate in practical. Due to this reason, we suggest two > common (and very useful) use-cases of this unbounded groupby aggregate: > 1. The range of grouping key is limit. In this case, a new arrival record > will either insert to state as new record or replace the existing record in > the backend state. The data in the backend state will not be evicted if the > resource is properly provisioned by the user, such that we can provision the > correctness on aggregation results. > 2. When the grouping key is unlimited, we will not be able ensure the 100% > correctness of "unbounded groupby aggregate". In this case, we will reply on > the TTL mechanism of the RocksDB backend state to evicted old data such that > we can provision the correct results in a certain time range. -- This message was sent by Atlassian JIRA (v6.3.15#6346)