I think it would be easier if we cast eventTs&r_eventTs as TIMESTAMP and do non-window join. Something like:
val sql1 = "select distinct id, cast(eventTs as timestamp) as eventTs, > " + > "count(*) over (partition by id order by eventTs rows" + > " between 100 preceding and current row) as cnt1 from myTable" > val sql2 = "select distinct id as r_id, cast(eventTs as timestamp) as > r_eventTs, " + > "count(*) over (partition by id " + > "order by eventTs rows between 50 preceding and current row) as cnt2 > from myTable" > val left = tEnv.sqlQuery(sql1) > val right = tEnv.sqlQuery(sql2) > left.join(right).where("id = r_id && eventTs === r_eventTs") Hope this helps. On Fri, Mar 9, 2018 at 10:51 PM, Timo Walther <twal...@apache.org> wrote: > Another workaround would be to split the query into two Table API parts. > > You could do the join, convert into a data stream, and convert into table > again. The optimizer does not optimize over DataStream API calls. > > What also should work is to cast your eventTs to TIMESTAMP as early as > possible to prevent this bug. > > Let us know if this helped. I think this bug has a good chance to be fixed > in 1.5.0 which will be released soon. > > Regards, > Timo > > Am 3/9/18 um 3:28 PM schrieb Xingcan Cui: > > Hi Yan, > > I think you could try that as a workaround. Don’t forget to follow the > DataStreamWindowJoin > <https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala> > to > hold back watermarks. We’ll continue improving the SQL/Table API part. > > Best, > Xingcan > > > On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yz...@coupang.com> > wrote: > > Hi Xingcan, Timo, > > Thanks for the information. > I am going to convert the result table to DataStream and follow the logic > of TimeBoundedStreamInnerJoin to do the timed-window join. Should I do > this? Is there any concern from performance or stability perspective? > > Best > Yan > > ------------------------------ > *From:* Xingcan Cui <xingc...@gmail.com> > *Sent:* Thursday, March 8, 2018 8:21:42 AM > *To:* Timo Walther > *Cc:* user; Yan Zhou [FDS Science] > *Subject:* Re: flink sql timed-window join throw "mismatched type" > AssertionError on rowtime column > > Hi Yan & Timo, > > this is confirmed to be a bug and I’ve created an issue [1] for it. > > I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT > keyword will be implemented with an aggregation, which outputs a retract > stream [2]. In that situation, all the time-related fields will be > materialized as if they were common fields (with the timestamp type). > Currently, due to the semantics problem, the time-windowed join cannot be > performed on retract streams. But you could try non-windowed join [3] after > we fix this. > > Best, > Xingcan > > [1] https://issues.apache.org/jira/browse/FLINK-8897 > [2] https://ci.apache.org/projects/flink/flink-docs- > master/dev/table/streaming.html#table-to-stream-conversion > [3] https://ci.apache.org/projects/flink/flink-docs- > master/dev/table/sql.html#joins > > On 8 Mar 2018, at 8:59 PM, Timo Walther <twal...@apache.org> wrote: > > Hi Xingcan, > > thanks for looking into this. This definitely seems to be a bug. Maybe in > the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case > we should create an issue for it. > > Regards, > Timo > > > Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]: > > Hi Xingcan, > > Thanks for your help. Attached is a sample code that can reproduce the > problem. > When I was writing the sample code, if I remove the `distinct` keyword in > select clause, the AssertionError doesn't occur. > > *String sql1 = "select distinct id, eventTs, count(*) over (partition by > id order by eventTs rows between 100 preceding and current row) as cnt1 > from myTable";* > > > Best > Yan > ------------------------------ > > *From:* xccui-foxmail <xingc...@gmail.com> <xingc...@gmail.com> > *Sent:* Wednesday, March 7, 2018 8:10 PM > *To:* Yan Zhou [FDS Science] > *Cc:* user@flink.apache.org > *Subject:* Re: flink sql timed-window join throw "mismatched type" > AssertionError on rowtime column > > Hi Yan, > > I’d like to look into this. Can you share more about your queries and the > full stack trace? > > Thank, > Xingcan > > On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com> > wrote: > > Hi experts, > I am using flink table api to join two tables, which are datastream > underneath. However, I got an assertion error of "java.lang.AssertionError: > mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details: > > There in only one kafka data source, which is then converted to Table and > registered. One existed column is set as rowtime(event time) attribute. Two > over-window aggregation queries are run against the table and two tables > are created as results. Everything works great so far. > However when timed-window joining two result tables with inherented > rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 > TIMESTAMP(3)" AssertionError. Can someone let me know what is the > possible cause? F.Y.I., I rename the rowtime column for one of the result > table. > > DataStream<MyObject> dataStream = env.addSource(kafkaConsumer); > > Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...); > > tableEnv.registerTable(tableName, table); > > Table left = tableEnv.sqlQuery("select id, *eventTime*,count (*) over > ... from ..."); > > Table right = tableEnv.sqlQuery("select id as r_id, *eventTime as > r_event_time*, count (*) over ... from ..."); > > left.join(right).where("id = r_id && eventTime === r_event_time) > > .addSink(...); // here calcite throw exception: java.lang.AssertionError: > mismatched type $1 TIMESTAMP(3) > > source table > |-- id: Long > |-- eventTime: TimeIndicatorTypeInfo(rowtime) > |-- ... > |-- ... > > result_1 table > |-- id: Long > |-- eventTime: TimeIndicatorTypeInfo(rowtime) > |-- ... > |-- ... > > result_2 table > |-- rid: Long > |-- r_event_time: TimeIndicatorTypeInfo(rowtime) > |-- ... > > > Best > Yan > > > >