Hi Yan,

I think you could try that as a workaround. Don’t forget to follow the 
DataStreamWindowJoin 
<https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala>
 to hold back watermarks. We’ll continue improving the SQL/Table API part.

Best,
Xingcan


> On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yz...@coupang.com> wrote:
> 
> Hi Xingcan, Timo, 
> 
> Thanks for the information. 
> I am going to convert the result table to DataStream and follow the logic of 
> TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is 
> there any concern from performance or stability perspective?
> 
> Best
> Yan
> 
> From: Xingcan Cui <xingc...@gmail.com>
> Sent: Thursday, March 8, 2018 8:21:42 AM
> To: Timo Walther
> Cc: user; Yan Zhou [FDS Science]
> Subject: Re: flink sql timed-window join throw "mismatched type" 
> AssertionError on rowtime column
>  
> Hi Yan & Timo,
> 
> this is confirmed to be a bug and I’ve created an issue [1] for it.
> 
> I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT 
> keyword will be implemented with an aggregation, which outputs a retract 
> stream [2]. In that situation, all the time-related fields will be 
> materialized as if they were common fields (with the timestamp type). 
> Currently, due to the semantics problem, the time-windowed join cannot be 
> performed on retract streams. But you could try non-windowed join [3] after 
> we fix this.
> 
> Best,
> Xingcan
> 
> [1] https://issues.apache.org/jira/browse/FLINK-8897 
> <https://issues.apache.org/jira/browse/FLINK-8897>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion>
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>
> 
>> On 8 Mar 2018, at 8:59 PM, Timo Walther <twal...@apache.org 
>> <mailto:twal...@apache.org>> wrote:
>> 
>> Hi Xingcan,
>> 
>> thanks for looking into this. This definitely seems to be a bug. Maybe in 
>> the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we 
>> should create an issue for it.
>> 
>> Regards,
>> Timo
>> 
>> 
>> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>>> Hi Xingcan,
>>> 
>>> Thanks for your help. Attached is a sample code that can reproduce the 
>>> problem.
>>> When I was writing the sample code, if I remove the `distinct` keyword in 
>>> select clause, the AssertionError doesn't occur.
>>> 
>>> String sql1 = "select distinct id, eventTs, count(*) over (partition by id 
>>> order by eventTs rows between 100 preceding and current row) as cnt1 from 
>>> myTable";
>>> 
>>> Best
>>> Yan
>>>  
>>> From: xccui-foxmail <xingc...@gmail.com> <mailto:xingc...@gmail.com>
>>> Sent: Wednesday, March 7, 2018 8:10 PM
>>> To: Yan Zhou [FDS Science]
>>> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
>>> Subject: Re: flink sql timed-window join throw "mismatched type" 
>>> AssertionError on rowtime column
>>>  
>>> Hi Yan,
>>> 
>>> I’d like to look into this. Can you share more about your queries and the 
>>> full stack trace?
>>> 
>>> Thank,
>>> Xingcan
>>> 
>>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com 
>>>> <mailto:yz...@coupang.com>> wrote:
>>>> 
>>>> Hi experts, 
>>>> I am using flink table api to join two tables, which are datastream 
>>>> underneath. However, I got an assertion error of 
>>>> "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime 
>>>> column. Below is more details:
>>>> 
>>>> There in only one kafka data source, which is then converted to Table and 
>>>> registered. One existed column is set as rowtime(event time) attribute. 
>>>> Two over-window aggregation queries are run against the table and two 
>>>> tables are created as results. Everything works great so far.
>>>> However when timed-window joining two result tables with inherented 
>>>> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 
>>>> TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible 
>>>> cause? F.Y.I., I rename the rowtime column for one of the result table.  
>>>> 
>>>> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>>>> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>>>> tableEnv.registerTable(tableName, table);
>>>> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  
>>>> from ...");
>>>> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as 
>>>> r_event_time, count (*) over ...  from ...");
>>>> left.join(right).where("id = r_id && eventTime === r_event_time)
>>>> .addSink(...); // here calcite throw exception: java.lang.AssertionError: 
>>>> mismatched type $1 TIMESTAMP(3) 
>>>> 
>>>> source table
>>>>  |-- id: Long
>>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- ...
>>>>  |-- ...
>>>>  
>>>> result_1 table
>>>>  |-- id: Long
>>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- ...
>>>>  |-- ...
>>>>  
>>>> result_2 table
>>>>  |-- rid: Long
>>>>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- ...
>>>> 
>>>> 
>>>> Best
>>>> Yan

Reply via email to