你可以把 upsert kafka 想象成是 mysql 表的实时物化视图, 你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。 那么最后的 sum(amount) 结果自然是 200。
如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka connector,不定义 pk 即可,也就是当成普通 log 处理了。 关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回 previous 值了。 Best, Jark On Thu, 10 Dec 2020 at 15:04, bulterman <15618338...@163.com> wrote: > 假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200 > 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 > :0, 100, 200 > 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = > Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? > 也是因为表中仅保留一条Code X的数据的关系吗? > > > 那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code > X的Amount,期望的输出是:0,100,300...,应该如何实现? > 求大佬解惑>< > > > > > > > > > > > > > > > > > > 在 2020-12-10 13:47:57,"Jark Wu" <imj...@gmail.com> 写道: > >因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code > >下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。 > > > >Best, > >Jark > > > >On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote: > > > >> // kafka table > >> tableEnv.execuetSql("CREATE TABLE market_stock(\n" + > >> > >> " Code STRING,\n" + > >> > >> " Amount BIGINT,\n" + > >> > >> ...... > >> > >> " PRIMARY KEY (Code) NOT ENFORCED\n" + > >> > >> ") WITH (\n" + > >> > >> " 'connector' = 'upsert-kafka',\n" + > >> > >> " 'topic' = 'zzz',\n" + > >> > >> " 'properties.bootstrap.servers' = '10.0.3.20:9092, > >> 10.0.3.24:9092,10.0.3.26:9092',\n" + > >> > >> " 'properties.group.id' = 'sqltest46',\n" + > >> > >> " 'key.format' = 'raw',\n" + > >> > >> " 'value.format' = 'json'\n" + > >> > >> ")"); > >> // 使用UDAF计算 > >> Table table = bsTableEnv.sqlQuery("select > >> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock > >> GROUP BY Code"); > >> env.toRetractStream(table,Row.class).print(); > >> > >> > >> // UDAF的定义如下 > >> public class MainFundFlowFunc extends AggregateFunction<Row, > AmountAccum> { > >> @Override > >> public Row getValue(AmountAccum acc) { > >> Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs; > >> double mfr = acc.lastAmount > 0 ? > >> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0; > >> return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb > ,acc.mb, > >> acc.sb,mf,mfr); > >> } > >> @Override > >> public AmountAccum createAccumulator() { > >> return new AmountAccum(); > >> } > >> > >> public void accumulate(AmountAccum acc, Long amount, Double > askPrice1, > >> Double bidPrice1, Double last) { > >> //...... > >> acc.lastAmount = amount; > >> acc.lastAskPrice1 = askPrice1; > >> acc.lastBidPrice1 = bidPrice1; > >> } > >> public void retract(AmountAccum acc, Long amount, Double askPrice1, > >> Double bidPrice1, Double last) { > >> acc.lastAmount = amount; > >> acc.lastAskPrice1 = askPrice1; > >> acc.lastBidPrice1 = bidPrice1; > >> } > >> > >> } > >> > >> > >> > >> > >> // acc > >> public class AmountAccum { > >> public Double lastAskPrice1; > >> public Double lastBidPrice1; > >> > >> public Long lastAmount = 0L; > >> > >> public Long ebs = 0L; > >> > >> public Long bs = 0L; > >> > >> public Long ms = 0L; > >> > >> public Long ss = 0L; > >> > >> public Long ebb = 0L; > >> > >> public Long bb = 0L; > >> > >> public Long mb = 0L; > >> > >> public Long sb = 0L; > >> } > >> > >> > >> debug观察acc的lastAmount值,一直是0. > >> > >> > >> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY > >> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。 > >> 是我的使用姿势不对吗= = > >> > >> 在 2020-12-10 11:30:31,"Jark Wu" <imj...@gmail.com> 写道: > >> >可以发下代码吗? > >> > > >> >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote: > >> > > >> >> 上游是upsert-kafka connector 创建的table, > >> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 > >> >> (为了测试方便,table里只有同一个PK的数据) > >> >