Hi, You can implement TopN on SQL/Table-api or write a datastream job with ProcessFunction to solve the problem.
Best, Hequn On Fri, Sep 28, 2018 at 9:38 AM 徐涛 <happydexu...@gmail.com> wrote: > Hi Hequn, > If limit n is not supported in streaming, how to solve top n problem in > stream scenario? > > Best > Henry > > 在 2018年9月28日,上午12:03,Hequn Cheng <chenghe...@gmail.com> 写道: > > Hi Henry, > > Currently, Order By is supported in Streaming&Batch while Limit is only > supported in Batch. Another thing to be noted is, for Order by, the result > of streaming queries must be primarily sorted on an ascending time > attribute[1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#orderby--limit > > > > On Thu, Sep 27, 2018 at 9:05 PM 徐涛 <happydexu...@gmail.com> wrote: > >> Hi, >> I want a top n result on each hop window result, but some error throws >> out when I add the order by sentence or the limit sentence, so how do I >> implement such case ? >> Thanks a lot. >> >> SELECT >> >> trackId as id,track_title as description, count(*) as cnt >> >> FROM >> >> play >> >> WHERE >> >> appName='play.statistics.trace' >> >> GROUP BY >> >> HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' >> MINUTE),trackId,track_title >> >> ORDER BY >> >> cnt desc >> >> LIMIT 10 >> >> >> FlinkLogicalSort(sort0=[$2], dir0=[DESC]) >> FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()]) >> FlinkLogicalCalc(expr#0..4=[{inputs}], >> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], >> started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6]) >> FlinkLogicalNativeTableScan(table=[[play]]) >> >> This exception indicates that the query uses an unsupported SQL feature. >> Please check the documentation for the set of currently supported SQL >> features. >> at >> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275) >> at >> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845) >> at >> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892) >> at >> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344) >> at >> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786) >> at >> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723) >> at >> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683) >> >> >> Best >> Henry >> > >