Hi Yan, I think you could try that as a workaround. Don’t forget to follow the DataStreamWindowJoin <https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala> to hold back watermarks. We’ll continue improving the SQL/Table API part.
Best, Xingcan > On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yz...@coupang.com> wrote: > > Hi Xingcan, Timo, > > Thanks for the information. > I am going to convert the result table to DataStream and follow the logic of > TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is > there any concern from performance or stability perspective? > > Best > Yan > > From: Xingcan Cui <xingc...@gmail.com> > Sent: Thursday, March 8, 2018 8:21:42 AM > To: Timo Walther > Cc: user; Yan Zhou [FDS Science] > Subject: Re: flink sql timed-window join throw "mismatched type" > AssertionError on rowtime column > > Hi Yan & Timo, > > this is confirmed to be a bug and I’ve created an issue [1] for it. > > I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT > keyword will be implemented with an aggregation, which outputs a retract > stream [2]. In that situation, all the time-related fields will be > materialized as if they were common fields (with the timestamp type). > Currently, due to the semantics problem, the time-windowed join cannot be > performed on retract streams. But you could try non-windowed join [3] after > we fix this. > > Best, > Xingcan > > [1] https://issues.apache.org/jira/browse/FLINK-8897 > <https://issues.apache.org/jira/browse/FLINK-8897> > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion > > <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion> > [3] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins > > <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins> > >> On 8 Mar 2018, at 8:59 PM, Timo Walther <twal...@apache.org >> <mailto:twal...@apache.org>> wrote: >> >> Hi Xingcan, >> >> thanks for looking into this. This definitely seems to be a bug. Maybe in >> the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we >> should create an issue for it. >> >> Regards, >> Timo >> >> >> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]: >>> Hi Xingcan, >>> >>> Thanks for your help. Attached is a sample code that can reproduce the >>> problem. >>> When I was writing the sample code, if I remove the `distinct` keyword in >>> select clause, the AssertionError doesn't occur. >>> >>> String sql1 = "select distinct id, eventTs, count(*) over (partition by id >>> order by eventTs rows between 100 preceding and current row) as cnt1 from >>> myTable"; >>> >>> Best >>> Yan >>> >>> From: xccui-foxmail <xingc...@gmail.com> <mailto:xingc...@gmail.com> >>> Sent: Wednesday, March 7, 2018 8:10 PM >>> To: Yan Zhou [FDS Science] >>> Cc: user@flink.apache.org <mailto:user@flink.apache.org> >>> Subject: Re: flink sql timed-window join throw "mismatched type" >>> AssertionError on rowtime column >>> >>> Hi Yan, >>> >>> I’d like to look into this. Can you share more about your queries and the >>> full stack trace? >>> >>> Thank, >>> Xingcan >>> >>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com >>>> <mailto:yz...@coupang.com>> wrote: >>>> >>>> Hi experts, >>>> I am using flink table api to join two tables, which are datastream >>>> underneath. However, I got an assertion error of >>>> "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime >>>> column. Below is more details: >>>> >>>> There in only one kafka data source, which is then converted to Table and >>>> registered. One existed column is set as rowtime(event time) attribute. >>>> Two over-window aggregation queries are run against the table and two >>>> tables are created as results. Everything works great so far. >>>> However when timed-window joining two result tables with inherented >>>> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 >>>> TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible >>>> cause? F.Y.I., I rename the rowtime column for one of the result table. >>>> >>>> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer); >>>> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...); >>>> tableEnv.registerTable(tableName, table); >>>> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ... >>>> from ..."); >>>> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as >>>> r_event_time, count (*) over ... from ..."); >>>> left.join(right).where("id = r_id && eventTime === r_event_time) >>>> .addSink(...); // here calcite throw exception: java.lang.AssertionError: >>>> mismatched type $1 TIMESTAMP(3) >>>> >>>> source table >>>> |-- id: Long >>>> |-- eventTime: TimeIndicatorTypeInfo(rowtime) >>>> |-- ... >>>> |-- ... >>>> >>>> result_1 table >>>> |-- id: Long >>>> |-- eventTime: TimeIndicatorTypeInfo(rowtime) >>>> |-- ... >>>> |-- ... >>>> >>>> result_2 table >>>> |-- rid: Long >>>> |-- r_event_time: TimeIndicatorTypeInfo(rowtime) >>>> |-- ... >>>> >>>> >>>> Best >>>> Yan