你可以把 upsert kafka 想象成是 mysql 表的实时物化视图,
你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。
那么最后的 sum(amount) 结果自然是 200。

如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka
connector,不定义 pk 即可,也就是当成普通 log 处理了。

关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回 previous 值了。

Best,
Jark



On Thu, 10 Dec 2020 at 15:04, bulterman <15618338...@163.com> wrote:

> 假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200
> 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是
> :0, 100, 200
> 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount =
> Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0?
> 也是因为表中仅保留一条Code X的数据的关系吗?
>
>
> 那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code
> X的Amount,期望的输出是:0,100,300...,应该如何实现?
> 求大佬解惑><
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-10 13:47:57,"Jark Wu" <imj...@gmail.com> 写道:
> >因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
> >下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。
> >
> >Best,
> >Jark
> >
> >On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote:
> >
> >> // kafka table
> >> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
> >>
> >>                 "    Code STRING,\n" +
> >>
> >>                 "    Amount BIGINT,\n" +
> >>
> >>                 ......
> >>
> >>                 "    PRIMARY KEY (Code) NOT ENFORCED\n" +
> >>
> >>                 ") WITH (\n" +
> >>
> >>                 "    'connector' = 'upsert-kafka',\n" +
> >>
> >>                 "    'topic' = 'zzz',\n" +
> >>
> >>                 "    'properties.bootstrap.servers' = '10.0.3.20:9092,
> >> 10.0.3.24:9092,10.0.3.26:9092',\n" +
> >>
> >>                 "    'properties.group.id' = 'sqltest46',\n" +
> >>
> >>                 "    'key.format' = 'raw',\n" +
> >>
> >>                 "    'value.format' = 'json'\n" +
> >>
> >>                 ")");
> >> // 使用UDAF计算
> >> Table table = bsTableEnv.sqlQuery("select
> >> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
> >> GROUP BY Code");
> >> env.toRetractStream(table,Row.class).print();
> >>
> >>
> >> // UDAF的定义如下
> >> public class MainFundFlowFunc extends AggregateFunction<Row,
> AmountAccum> {
> >>     @Override
> >>     public Row getValue(AmountAccum acc) {
> >>         Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
> >>         double mfr = acc.lastAmount > 0 ?
> >> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
> >>         return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb
> ,acc.mb,
> >> acc.sb,mf,mfr);
> >>     }
> >>     @Override
> >>     public AmountAccum createAccumulator() {
> >>         return new AmountAccum();
> >>     }
> >>
> >>     public void accumulate(AmountAccum acc, Long amount, Double
> askPrice1,
> >> Double bidPrice1, Double last) {
> >>         //......
> >>        acc.lastAmount = amount;
> >>         acc.lastAskPrice1 = askPrice1;
> >>         acc.lastBidPrice1 = bidPrice1;
> >>     }
> >>     public void retract(AmountAccum acc, Long amount, Double askPrice1,
> >> Double bidPrice1, Double last) {
> >>         acc.lastAmount = amount;
> >>         acc.lastAskPrice1 = askPrice1;
> >>         acc.lastBidPrice1 = bidPrice1;
> >>     }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> // acc
> >> public class AmountAccum {
> >>     public Double lastAskPrice1;
> >>     public Double lastBidPrice1;
> >>
> >>     public Long lastAmount = 0L;
> >>
> >>     public Long ebs = 0L;
> >>
> >>     public Long bs = 0L;
> >>
> >>     public Long ms = 0L;
> >>
> >>     public Long ss = 0L;
> >>
> >>     public Long ebb = 0L;
> >>
> >>     public Long bb = 0L;
> >>
> >>     public Long mb = 0L;
> >>
> >>     public Long sb = 0L;
> >> }
> >>
> >>
> >> debug观察acc的lastAmount值,一直是0.
> >>
> >>
> >> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
> >> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
> >> 是我的使用姿势不对吗= =
> >>
> >> 在 2020-12-10 11:30:31,"Jark Wu" <imj...@gmail.com> 写道:
> >> >可以发下代码吗?
> >> >
> >> >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:
> >> >
> >> >> 上游是upsert-kafka connector 创建的table,
> >> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> >> >> (为了测试方便,table里只有同一个PK的数据)
> >>
>

回复