Hi Xingcan, Timo,

Thanks for the information.
I am going to convert the result table to DataStream and follow the logic of 
TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is 
there any concern from performance or stability perspective?

Best
Yan


________________________________
From: Xingcan Cui <xingc...@gmail.com>
Sent: Thursday, March 8, 2018 8:21:42 AM
To: Timo Walther
Cc: user; Yan Zhou [FDS Science]
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError 
on rowtime column

Hi Yan & Timo,

this is confirmed to be a bug and I’ve created an issue [1] for it.

I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT 
keyword will be implemented with an aggregation, which outputs a retract stream 
[2]. In that situation, all the time-related fields will be materialized as if 
they were common fields (with the timestamp type). Currently, due to the 
semantics problem, the time-windowed join cannot be performed on retract 
streams. But you could try non-windowed join [3] after we fix this.

Best,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-8897
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins

On 8 Mar 2018, at 8:59 PM, Timo Walther 
<twal...@apache.org<mailto:twal...@apache.org>> wrote:

Hi Xingcan,

thanks for looking into this. This definitely seems to be a bug. Maybe in the 
org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we should 
create an issue for it.

Regards,
Timo


Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
Hi Xingcan,

Thanks for your help. Attached is a sample code that can reproduce the problem.
When I was writing the sample code, if I remove the `distinct` keyword in 
select clause, the AssertionError doesn't occur.

String sql1 = "select distinct id, eventTs, count(*) over (partition by id 
order by eventTs rows between 100 preceding and current row) as cnt1 from 
myTable";

Best
Yan
________________________________
From: xccui-foxmail <xingc...@gmail.com><mailto:xingc...@gmail.com>
Sent: Wednesday, March 7, 2018 8:10 PM
To: Yan Zhou [FDS Science]
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError 
on rowtime column

Hi Yan,

I’d like to look into this. Can you share more about your queries and the full 
stack trace?

Thank,
Xingcan

On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] 
<yz...@coupang.com<mailto:yz...@coupang.com>> wrote:

Hi experts,
I am using flink table api to join two tables, which are datastream underneath. 
However, I got an assertion error of "java.lang.AssertionError: mismatched type 
$1 TIMESTAMP(3)" on rowtime column. Below is more details:

There in only one kafka data source, which is then converted to Table and 
registered. One existed column is set as rowtime(event time) attribute. Two 
over-window aggregation queries are run against the table and two tables are 
created as results. Everything works great so far.
However when timed-window joining two result tables with inherented rowtime, 
calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" 
AssertionError. Can someone let me know what is the possible cause? F.Y.I., I 
rename the rowtime column for one of the result table.


DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);

Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);

tableEnv.registerTable(tableName, table);

Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from 
...");

Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, 
count (*) over ...  from ...");

left.join(right).where("id = r_id && eventTime === r_event_time)

.addSink(...); // here calcite throw exception: java.lang.AssertionError: 
mismatched type $1 TIMESTAMP(3)

source table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_1 table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_2 table
 |-- rid: Long
 |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
 |-- ...


Best
Yan




Reply via email to