假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 :0, 100, 200 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? 也是因为表中仅保留一条Code X的数据的关系吗?
那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code X的Amount,期望的输出是:0,100,300...,应该如何实现? 求大佬解惑>< 在 2020-12-10 13:47:57,"Jark Wu" <imj...@gmail.com> 写道: >因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code >下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。 > >Best, >Jark > >On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote: > >> // kafka table >> tableEnv.execuetSql("CREATE TABLE market_stock(\n" + >> >> " Code STRING,\n" + >> >> " Amount BIGINT,\n" + >> >> ...... >> >> " PRIMARY KEY (Code) NOT ENFORCED\n" + >> >> ") WITH (\n" + >> >> " 'connector' = 'upsert-kafka',\n" + >> >> " 'topic' = 'zzz',\n" + >> >> " 'properties.bootstrap.servers' = '10.0.3.20:9092, >> 10.0.3.24:9092,10.0.3.26:9092',\n" + >> >> " 'properties.group.id' = 'sqltest46',\n" + >> >> " 'key.format' = 'raw',\n" + >> >> " 'value.format' = 'json'\n" + >> >> ")"); >> // 使用UDAF计算 >> Table table = bsTableEnv.sqlQuery("select >> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock >> GROUP BY Code"); >> env.toRetractStream(table,Row.class).print(); >> >> >> // UDAF的定义如下 >> public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> { >> @Override >> public Row getValue(AmountAccum acc) { >> Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs; >> double mfr = acc.lastAmount > 0 ? >> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0; >> return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb, >> acc.sb,mf,mfr); >> } >> @Override >> public AmountAccum createAccumulator() { >> return new AmountAccum(); >> } >> >> public void accumulate(AmountAccum acc, Long amount, Double askPrice1, >> Double bidPrice1, Double last) { >> //...... >> acc.lastAmount = amount; >> acc.lastAskPrice1 = askPrice1; >> acc.lastBidPrice1 = bidPrice1; >> } >> public void retract(AmountAccum acc, Long amount, Double askPrice1, >> Double bidPrice1, Double last) { >> acc.lastAmount = amount; >> acc.lastAskPrice1 = askPrice1; >> acc.lastBidPrice1 = bidPrice1; >> } >> >> } >> >> >> >> >> // acc >> public class AmountAccum { >> public Double lastAskPrice1; >> public Double lastBidPrice1; >> >> public Long lastAmount = 0L; >> >> public Long ebs = 0L; >> >> public Long bs = 0L; >> >> public Long ms = 0L; >> >> public Long ss = 0L; >> >> public Long ebb = 0L; >> >> public Long bb = 0L; >> >> public Long mb = 0L; >> >> public Long sb = 0L; >> } >> >> >> debug观察acc的lastAmount值,一直是0. >> >> >> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY >> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。 >> 是我的使用姿势不对吗= = >> >> 在 2020-12-10 11:30:31,"Jark Wu" <imj...@gmail.com> 写道: >> >可以发下代码吗? >> > >> >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote: >> > >> >> 上游是upsert-kafka connector 创建的table, >> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 >> >> (为了测试方便,table里只有同一个PK的数据) >>