这个可以,非常感谢。 select user, sum(num * IF(flag=1, 1, 0)) as num > from ( > select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag > from tmpTable, > group by user, ord > ) t1 > group by user
------- > 2021年11月25日 11:19,Tony Wei <tony19920...@gmail.com> 写道: > > 上一封的 sql 稍微有誤,不需要 group by user, ord 才對: > > select user, sum(num) as num >> from ( >> select user, ord, num * IF(flag=1, 1, -1) as num >> from tmpTable >> ) t1 >> group by user > > > 或者也可以考慮這種寫法: > > select user, sum(num * IF(flag=1, 1, 0)) as num >> from ( >> select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag >> from tmpTable, >> group by user, ord >> ) t1 >> group by user > > > best regards, > > Tony Wei <tony19920...@gmail.com> 於 2021年11月25日 週四 上午11:01寫道: > >> Hi, >> >> 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為: >> >> +------+-------+ >> | user | num | >> +------+-------+ >> | b | 20 | >> +------+-------+ >> >> 因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。 >> >> 或許可以考慮把 sql 寫法改為這樣試試? >> >> select user, sum(num) as num >>> from ( >>> select user, ord, num * IF(flag=1, 1, -1) as num >>> from tmpTable, >>> group by user, ord >>> ) t1 >>> group by user >> >> >> best regards, >> >> wushijjian5 <wsjwoods...@163.com> 於 2021年11月25日 週四 上午10:21寫道: >> >>> Hi, >>> 这三条数据的话: >>> new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new >>> Tuple4<>("a","a1",30,0) >>> >>> 计算结果是: >>> | +I | a | 30 | >>> | +I | b | 20 | >>> | -D | a | 30 | >>> >>> 实际想要的是 >>> a 30 >>> b 20 >>> a 0 >>> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。 >>> 不知道说的清不清楚。 >>> 只通过flink-sql方式好像实现不了 >>> >>>> 2021年11月25日 09:45,Caizhi Weng <tsreape...@gmail.com> 写道: >>>> >>>> Hi! >>>> >>>> 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的 >>> CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。 >>>> >>>> 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。 >>>> >>>> wushijjian5 <wsjwoods...@163.com <mailto:wsjwoods...@163.com>> >>> 于2021年11月24日周三 下午9:05写道: >>>> >>>> >>>> DataStream<Tuple4<String, String,Integer, Integer>> dataStream = >>> env.fromElements( >>>> new Tuple4<>("a", "a1",30,1), >>>> new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0), >>>> new Tuple4<>("a","a2",30,1), >>>> new Tuple4<>("a","a3",30,1)); >>>> tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"), >>> $("num"), $("flag")); >>>> Table table = tEnv.sqlQuery( >>>> " select user,sum(num) as num" + >>>> " from (" + >>>> " select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as >>> flag " + >>>> "from tmpTable " + >>>> "group by user,ord " + >>>> ") t1" + >>>> " where flag=1 " + >>>> " group by user" + >>>> ""); >>>> table.execute().print(); >>>> >>>> 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx 只通过sql的方式 >>>> >>> >>>