可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
 
<https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
> 在 2019年11月7日,下午7:06,Chennet Steven <stevenchen...@live.com> 写道:
> 
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
> 
> From stevenchen
>         webchat 38798579
> 
> ________________________________
> 发件人: wenlong.lwl <wenlong88....@gmail.com>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org <user-zh@flink.apache.org>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
> 
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
> 
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <stevenchen...@live.com> wrote:
> 
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>> 如何在聚合函数中使用State?
>> 
>> 
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>> 
>> 
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>> 
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>> 
>>  override def open(context: FunctionContext): Unit = {
>>    // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>    //getRuntimeContext.getState(desc)
>>    val a = this.hashCode()
>>    print(s"hashCode:$a")
>>    super.open(context)
>>  }
>> 
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>    val acc = new IntDiffSumAccumulator()
>>    acc.f0 = 0
>>    acc.f1 = false
>>    acc
>>  }
>> 
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>    accumulator.f0 += value
>>    accumulator.f1 = true
>>  }
>> 
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>    if (accumulator.f1) {
>> 
>>      accumulator.f0
>>    } else {
>>      Int.MinValue
>>    }
>>  }
>> 
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>    val iter = its.iterator()
>>    while (true) {
>>      val a = iter.next()
>>      if (a.f1) {
>>        acc.f0 += a.f0
>>        acc.f1 = true
>>      }
>>    }
>>  }
>> 
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>    acc.f0 = 0
>>    acc.f1 = false
>>  }
>> 
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>    new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>> 
>> 
>> From stevenchen
>>         webchat 38798579
>> 
>> 
>> 

回复