Hi,

tuple should not be used anywhere in flink-table. @Rong can you point us to the corresponding code? I haven't looked into the code but we should definitely support this query. @Henry feel free to open an issue for it.

Regards,
Timo


Am 28.09.18 um 19:14 schrieb Rong Rong:
Yes.

Thanks for bringing this up Hequn! :-) I think Tuple would not be the best container to use.

However, in search for alternative, shouldn't Collection / List be a more suitable solution? Row seems to not fit in the context (as there can be Rows with elements of different type). I vaguely recall there was similar JIRA but might not be related to IN clause. Let me try to dig it up.

--
Rong

On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng <chenghe...@gmail.com <mailto:chenghe...@gmail.com>> wrote:

    Hi,

    I haven't look into the code. If this is limited by Tuple, would
    it better to implement it with Row?

    Best, Hequn

    On Fri, Sep 28, 2018 at 9:27 PM Rong Rong <walter...@gmail.com
    <mailto:walter...@gmail.com>> wrote:

        Hi Henry, Vino.

        I think IN operator was translated into either a RexSubQuery
        or a SqlStdOperatorTable.IN operator.
        I think Vino was referring to the first case.
        For the second case (I think that's what you are facing here),
        they are converted into tuples and the maximum we currently
        have in Flink was Tuple25.java, I was wondering if that was
        the issue you are facing. You can probably split the IN into
        many IN combining with OR.

        --
        Rong

        On Fri, Sep 28, 2018 at 2:33 AM vino yang
        <yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> wrote:

            Hi Henry,

            Maybe the number of elements in your IN clause is out of
            range? Its default value is 20, you can modify it with
            this configuration item:

            /*withInSubQueryThreshold(XXX)*/

            This API comes from Calcite.

            Thanks, vino.

            徐涛 <happydexu...@gmail.com
            <mailto:happydexu...@gmail.com>> 于2018年9月28日周五
            下午4:23写道:

                Hi,

                     When I am executing the following SQL in flink 1.6.1, some 
error throws out saying that it has a support issue, but when I reduce the 
number of integers in the “in” sentence, for example,

                trackId in (124427150,71648998) , Flink does not
                complain anything, so I wonder is there any length
                limit in “in”operation?

                Thanks a lot.

                SELECT
                     trackId as id,track_title as description, count(*) as cnt
                FROM
                     play
                WHERE
                     appName='play.statistics.trace' and
                     trackId in 
(124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
                GROUP BY
                     HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' 
MINUTE),trackId,track_title;



                FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
                FlinkLogicalCalc(expr#0..3=[{inputs}],
                started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
                    FlinkLogicalJoin(condition=[=($0, $3)],
                joinType=[inner])
                FlinkLogicalCalc(expr#0..4=[{inputs}],
                expr#5=[_UTF-16LE'play.statistics.trace'],
                expr#6=[=($t0, $t5)], trackId=[$t1],
                track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
                FlinkLogicalNativeTableScan(table=[[play]])
                      FlinkLogicalValues(tuples=[[{ 124427150 }, {
                71648998 }, { 124493327 }, { 524043 }, { 27300837 }, {
                30300481 }, { 27300809 }, { 124744768 }, { 45982512 },
                { 124526566 }, { 124556427 }, { 124804208 }, {
                74302264 }, { 119588973 }, { 30496269 }, { 27300288 },
                { 124098818 }, { 125071530 }, { 120918746 }, {
                124171456 }, { 30413034 }, { 124888075 }, { 125270551
                }, { 125434224 }, { 27300195 }, { 45982342 }, {
                45982468 }, { 45982355 }, { 65349883 }, { 124705962 },
                { 65349905 }, { 124298305 }, { 124889583 }, { 45982338
                }, { 20506255 }, { 18556415 }, { 122161128 }, {
                27299018 }, { 122850375 }, { 124862362 }, { 45982336
                }, { 59613202 }, { 122991190 }, { 124590280 }, {
                124867563 }, { 45982332 }, { 124515944 }, { 20506257
                }, { 122572115 }, { 92083574 }]])

                This exception indicates that the query uses an
                unsupported SQL feature.
                Please check the documentation for the set of
                currently supported SQL features.
                at
                
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
                at
                
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
                at
                
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
                at
                
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
                at
                
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
                at
                
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
                at
                
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
                at
                
com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
                at
                
com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
                at
                
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
                at
                
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
                at
                
com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
                at
                
com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
                at
                
com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
                at
                
com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

                Best
                Henry


Reply via email to