// kafka table
tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +

                "    Code STRING,\n" +

                "    Amount BIGINT,\n" +

                ......

                "    PRIMARY KEY (Code) NOT ENFORCED\n" +

                ") WITH (\n" +

                "    'connector' = 'upsert-kafka',\n" +

                "    'topic' = 'zzz',\n" +

                "    'properties.bootstrap.servers' = 
'10.0.3.20:9092,10.0.3.24:9092,10.0.3.26:9092',\n" +

                "    'properties.group.id' = 'sqltest46',\n" +

                "    'key.format' = 'raw',\n" +

                "    'value.format' = 'json'\n" +

                ")");
// 使用UDAF计算
Table table = bsTableEnv.sqlQuery("select 
Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock GROUP 
BY Code");
env.toRetractStream(table,Row.class).print();


// UDAF的定义如下
public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> {
    @Override
    public Row getValue(AmountAccum acc) {
        Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
        double mfr = acc.lastAmount > 0 ? 
MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
        return 
Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,acc.sb,mf,mfr);
    }
    @Override
    public AmountAccum createAccumulator() {
        return new AmountAccum();
    }
   
    public void accumulate(AmountAccum acc, Long amount, Double askPrice1, 
Double bidPrice1, Double last) {
        //......
       acc.lastAmount = amount;
        acc.lastAskPrice1 = askPrice1;
        acc.lastBidPrice1 = bidPrice1;
    }
    public void retract(AmountAccum acc, Long amount, Double askPrice1, Double 
bidPrice1, Double last) {
        acc.lastAmount = amount;
        acc.lastAskPrice1 = askPrice1;
        acc.lastBidPrice1 = bidPrice1;
    }
    
}




// acc
public class AmountAccum {
    public Double lastAskPrice1;
    public Double lastBidPrice1;
    
    public Long lastAmount = 0L;
   
    public Long ebs = 0L;
   
    public Long bs = 0L;
   
    public Long ms = 0L;
   
    public Long ss = 0L;
    
    public Long ebb = 0L;
    
    public Long bb = 0L;
    
    public Long mb = 0L;
   
    public Long sb = 0L;
}


debug观察acc的lastAmount值,一直是0.


刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY 
Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
是我的使用姿势不对吗= =

在 2020-12-10 11:30:31,"Jark Wu" <imj...@gmail.com> 写道:
>可以发下代码吗?
>
>On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:
>
>> 上游是upsert-kafka connector 创建的table,
>> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
>> (为了测试方便,table里只有同一个PK的数据)

回复