Hi Xingcan, Timo,

Thanks for the information.
I am going to convert the result table to DataStream and follow the logic of 
TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is 
there any concern from performance or stability perspective?


From: Xingcan Cui <xingc...@gmail.com>
Sent: Thursday, March 8, 2018 8:21:42 AM
To: Timo Walther
Cc: user; Yan Zhou [FDS Science]
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError 
on rowtime column

Hi Yan & Timo,

this is confirmed to be a bug and I’ve created an issue [1] for it.

I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT 
keyword will be implemented with an aggregation, which outputs a retract stream 
[2]. In that situation, all the time-related fields will be materialized as if 
they were common fields (with the timestamp type). Currently, due to the 
semantics problem, the time-windowed join cannot be performed on retract 
streams. But you could try non-windowed join [3] after we fix this.


[1] https://issues.apache.org/jira/browse/FLINK-8897

On 8 Mar 2018, at 8:59 PM, Timo Walther 
<twal...@apache.org<mailto:twal...@apache.org>> wrote:

Hi Xingcan,

thanks for looking into this. This definitely seems to be a bug. Maybe in the 
org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we should 
create an issue for it.


Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
Hi Xingcan,

Thanks for your help. Attached is a sample code that can reproduce the problem.
When I was writing the sample code, if I remove the `distinct` keyword in 
select clause, the AssertionError doesn't occur.

String sql1 = "select distinct id, eventTs, count(*) over (partition by id 
order by eventTs rows between 100 preceding and current row) as cnt1 from 

From: xccui-foxmail <xingc...@gmail.com><mailto:xingc...@gmail.com>
Sent: Wednesday, March 7, 2018 8:10 PM
To: Yan Zhou [FDS Science]
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError 
on rowtime column

Hi Yan,

I’d like to look into this. Can you share more about your queries and the full 
stack trace?


On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] 
<yz...@coupang.com<mailto:yz...@coupang.com>> wrote:

Hi experts,
I am using flink table api to join two tables, which are datastream underneath. 
However, I got an assertion error of "java.lang.AssertionError: mismatched type 
$1 TIMESTAMP(3)" on rowtime column. Below is more details:

There in only one kafka data source, which is then converted to Table and 
registered. One existed column is set as rowtime(event time) attribute. Two 
over-window aggregation queries are run against the table and two tables are 
created as results. Everything works great so far.
However when timed-window joining two result tables with inherented rowtime, 
calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" 
AssertionError. Can someone let me know what is the possible cause? F.Y.I., I 
rename the rowtime column for one of the result table.

DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);

Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);

tableEnv.registerTable(tableName, table);

Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from 

Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, 
count (*) over ...  from ...");

left.join(right).where("id = r_id && eventTime === r_event_time)

.addSink(...); // here calcite throw exception: java.lang.AssertionError: 
mismatched type $1 TIMESTAMP(3)

source table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_1 table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_2 table
 |-- rid: Long
 |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
 |-- ...


Reply via email to