// kafka table tableEnv.execuetSql("CREATE TABLE market_stock(\n" +
" Code STRING,\n" + " Amount BIGINT,\n" + ...... " PRIMARY KEY (Code) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'upsert-kafka',\n" + " 'topic' = 'zzz',\n" + " 'properties.bootstrap.servers' = '10.0.3.20:9092,10.0.3.24:9092,10.0.3.26:9092',\n" + " 'properties.group.id' = 'sqltest46',\n" + " 'key.format' = 'raw',\n" + " 'value.format' = 'json'\n" + ")"); // 使用UDAF计算 Table table = bsTableEnv.sqlQuery("select Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock GROUP BY Code"); env.toRetractStream(table,Row.class).print(); // UDAF的定义如下 public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> { @Override public Row getValue(AmountAccum acc) { Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs; double mfr = acc.lastAmount > 0 ? MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0; return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,acc.sb,mf,mfr); } @Override public AmountAccum createAccumulator() { return new AmountAccum(); } public void accumulate(AmountAccum acc, Long amount, Double askPrice1, Double bidPrice1, Double last) { //...... acc.lastAmount = amount; acc.lastAskPrice1 = askPrice1; acc.lastBidPrice1 = bidPrice1; } public void retract(AmountAccum acc, Long amount, Double askPrice1, Double bidPrice1, Double last) { acc.lastAmount = amount; acc.lastAskPrice1 = askPrice1; acc.lastBidPrice1 = bidPrice1; } } // acc public class AmountAccum { public Double lastAskPrice1; public Double lastBidPrice1; public Long lastAmount = 0L; public Long ebs = 0L; public Long bs = 0L; public Long ms = 0L; public Long ss = 0L; public Long ebb = 0L; public Long bb = 0L; public Long mb = 0L; public Long sb = 0L; } debug观察acc的lastAmount值,一直是0. 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。 是我的使用姿势不对吗= = 在 2020-12-10 11:30:31,"Jark Wu" <imj...@gmail.com> 写道: >可以发下代码吗? > >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote: > >> 上游是upsert-kafka connector 创建的table, >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 >> (为了测试方便,table里只有同一个PK的数据)