I think it would be easier if we cast eventTs&r_eventTs as TIMESTAMP and do
non-window join. Something like:

    val sql1 = "select distinct id, cast(eventTs as timestamp) as eventTs,
> " +
>       "count(*) over (partition by id order by eventTs rows" +
>       " between 100 preceding and current row) as cnt1 from myTable"
>     val sql2 = "select distinct id as r_id, cast(eventTs as timestamp) as
> r_eventTs, " +
>       "count(*) over (partition by id " +
>       "order by eventTs rows between 50 preceding and current row) as cnt2
> from myTable"
>     val left = tEnv.sqlQuery(sql1)
>     val right = tEnv.sqlQuery(sql2)
>     left.join(right).where("id = r_id && eventTs === r_eventTs")


Hope this helps.

On Fri, Mar 9, 2018 at 10:51 PM, Timo Walther <twal...@apache.org> wrote:

> Another workaround would be to split the query into two Table API parts.
>
> You could do the join, convert into a data stream, and convert into table
> again. The optimizer does not optimize over DataStream API calls.
>
> What also should work is to cast your eventTs to TIMESTAMP as early as
> possible to prevent this bug.
>
> Let us know if this helped. I think this bug has a good chance to be fixed
> in 1.5.0 which will be released soon.
>
> Regards,
> Timo
>
> Am 3/9/18 um 3:28 PM schrieb Xingcan Cui:
>
> Hi Yan,
>
> I think you could try that as a workaround. Don’t forget to follow the
> DataStreamWindowJoin
> <https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala>
>  to
> hold back watermarks. We’ll continue improving the SQL/Table API part.
>
> Best,
> Xingcan
>
>
> On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yz...@coupang.com>
> wrote:
>
> Hi Xingcan, Timo,
>
> Thanks for the information.
> I am going to convert the result table to DataStream and follow the logic
> of TimeBoundedStreamInnerJoin to do the timed-window join. Should I do
> this? Is there any concern from performance or stability perspective?
>
> Best
> Yan
>
> ------------------------------
> *From:* Xingcan Cui <xingc...@gmail.com>
> *Sent:* Thursday, March 8, 2018 8:21:42 AM
> *To:* Timo Walther
> *Cc:* user; Yan Zhou [FDS Science]
> *Subject:* Re: flink sql timed-window join throw "mismatched type"
> AssertionError on rowtime column
>
> Hi Yan & Timo,
>
> this is confirmed to be a bug and I’ve created an issue [1] for it.
>
> I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT
> keyword will be implemented with an aggregation, which outputs a retract
> stream [2]. In that situation, all the time-related fields will be
> materialized as if they were common fields (with the timestamp type).
> Currently, due to the semantics problem, the time-windowed join cannot be
> performed on retract streams. But you could try non-windowed join [3] after
> we fix this.
>
> Best,
> Xingcan
>
> [1] https://issues.apache.org/jira/browse/FLINK-8897
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/streaming.html#table-to-stream-conversion
> [3] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html#joins
>
> On 8 Mar 2018, at 8:59 PM, Timo Walther <twal...@apache.org> wrote:
>
> Hi Xingcan,
>
> thanks for looking into this. This definitely seems to be a bug. Maybe in
> the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case
> we should create an issue for it.
>
> Regards,
> Timo
>
>
> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>
> Hi Xingcan,
>
> Thanks for your help. Attached is a sample code that can reproduce the
> problem.
> When I was writing the sample code, if I remove the `distinct` keyword in
> select clause, the AssertionError doesn't occur.
>
> *String sql1 = "select distinct id, eventTs, count(*) over (partition by
> id order by eventTs rows between 100 preceding and current row) as cnt1
> from myTable";*
>
>
> Best
> Yan
> ------------------------------
>
> *From:* xccui-foxmail <xingc...@gmail.com> <xingc...@gmail.com>
> *Sent:* Wednesday, March 7, 2018 8:10 PM
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql timed-window join throw "mismatched type"
> AssertionError on rowtime column
>
> Hi Yan,
>
> I’d like to look into this. Can you share more about your queries and the
> full stack trace?
>
> Thank,
> Xingcan
>
> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com>
> wrote:
>
> Hi experts,
> I am using flink table api to join two tables, which are datastream
> underneath. However, I got an assertion error of "java.lang.AssertionError:
> mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>
> There in only one kafka data source, which is then converted to Table and
> registered. One existed column is set as rowtime(event time) attribute. Two
> over-window aggregation queries are run against the table and two tables
> are created as results. Everything works great so far.
> However when timed-window joining two result tables with inherented
> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1
> TIMESTAMP(3)" AssertionError. Can someone let me know what is the
> possible cause? F.Y.I., I rename the rowtime column for one of the result
> table.
>
> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>
> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>
> tableEnv.registerTable(tableName, table);
>
> Table left = tableEnv.sqlQuery("select id, *eventTime*,count (*) over
> ...  from ...");
>
> Table right = tableEnv.sqlQuery("select id as r_id, *eventTime as
> r_event_time*, count (*) over ...  from ...");
>
> left.join(right).where("id = r_id && eventTime === r_event_time)
>
> .addSink(...); // here calcite throw exception: java.lang.AssertionError:
> mismatched type $1 TIMESTAMP(3)
>
> source table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>
> result_1 table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>
> result_2 table
>  |-- rid: Long
>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>
>
> Best
> Yan
>
>
>
>

Reply via email to