Hi,

You can implement TopN on SQL/Table-api or write a datastream job with
ProcessFunction to solve the problem.

Best, Hequn

On Fri, Sep 28, 2018 at 9:38 AM 徐涛 <happydexu...@gmail.com> wrote:

> Hi Hequn,
> If limit n is not supported in streaming, how to solve top n problem in
> stream scenario?
>
> Best
> Henry
>
> 在 2018年9月28日,上午12:03,Hequn Cheng <chenghe...@gmail.com> 写道:
>
> Hi Henry,
>
> Currently, Order By is supported in Streaming&Batch while Limit is only
> supported in Batch. Another thing to be noted is, for Order by, the result
> of streaming queries must be primarily sorted on an ascending time
> attribute[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#orderby--limit
>
>
>
> On Thu, Sep 27, 2018 at 9:05 PM 徐涛 <happydexu...@gmail.com> wrote:
>
>> Hi,
>> I want a top n result on each hop window result, but some error throws
>> out when I add the order by sentence or the limit sentence, so how do I
>> implement such case ?
>> Thanks a lot.
>>
>> SELECT
>>
>>     trackId as id,track_title as description, count(*) as cnt
>>
>> FROM
>>
>>     play
>>
>> WHERE
>>
>>     appName='play.statistics.trace'
>>
>> GROUP BY
>>
>>     HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' 
>> MINUTE),trackId,track_title
>>
>> ORDER BY
>>
>>     cnt desc
>>
>> LIMIT 10
>>
>>
>> FlinkLogicalSort(sort0=[$2], dir0=[DESC])
>>   FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
>>     FlinkLogicalCalc(expr#0..4=[{inputs}],
>> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)],
>> started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6])
>>       FlinkLogicalNativeTableScan(table=[[play]])
>>
>> This exception indicates that the query uses an unsupported SQL feature.
>> Please check the documentation for the set of currently supported SQL
>> features.
>> at
>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>> at
>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
>> at
>> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
>> at
>> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
>>
>>
>> Best
>> Henry
>>
>
>

Reply via email to