Another workaround would be to split the query into two Table API parts.
You could do the join, convert into a data stream, and convert into
table again. The optimizer does not optimize over DataStream API calls.
What also should work is to cast your eventTs to TIMESTAMP as early as
possible to prevent this bug.
Let us know if this helped. I think this bug has a good chance to be
fixed in 1.5.0 which will be released soon.
Regards,
Timo
Am 3/9/18 um 3:28 PM schrieb Xingcan Cui:
Hi Yan,
I think you could try that as a workaround. Don’t forget to follow the
DataStreamWindowJoin
<https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala> to
hold back watermarks. We’ll continue improving the SQL/Table API part.
Best,
Xingcan
On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yz...@coupang.com
<mailto:yz...@coupang.com>> wrote:
Hi Xingcan, Timo,
Thanks for the information.
I am going to convert the result table to DataStream and follow the
logic of TimeBoundedStreamInnerJoin to do the timed-window join.
Should I do this? Is there any concern from performance or stability
perspective?
Best
Yan
------------------------------------------------------------------------
*From:*Xingcan Cui <xingc...@gmail.com <mailto:xingc...@gmail.com>>
*Sent:*Thursday, March 8, 2018 8:21:42 AM
*To:*Timo Walther
*Cc:*user; Yan Zhou [FDS Science]
*Subject:*Re: flink sql timed-window join throw "mismatched type"
AssertionError on rowtime column
Hi Yan & Timo,
this is confirmed to be a bug and I’ve created an issue [1] for it.
I’ll explain more about this query. In Flink SQL/Table API, the
DISTINCT keyword will be implemented with an aggregation, which
outputs a retract stream [2]. In that situation, all the time-related
fields will be materialized as if they were common fields (with the
timestamp type). Currently, due to the semantics problem, the
time-windowed join cannot be performed on retract streams. But you
could try non-windowed join [3] after we fix this.
Best,
Xingcan
[1] https://issues.apache.org/jira/browse/FLINK-8897
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
On 8 Mar 2018, at 8:59 PM, Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Xingcan,
thanks for looking into this. This definitely seems to be a bug.
Maybe in the
org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any
case we should create an issue for it.
Regards,
Timo
Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
Hi Xingcan,
Thanks for your help. Attached is a sample code that can reproduce
the problem.
When I was writing the sample code, if I remove the `distinct`
keyword in select clause, the AssertionError doesn't occur.
/String sql1 = "select*distinct*id, eventTs, count(*) over
(partition by id order by eventTs rows between 100 preceding
and current row) as cnt1 from myTable";/
Best
Yan
------------------------------------------------------------------------
*From:*xccui-foxmail<xingc...@gmail.com> <mailto:xingc...@gmail.com>
*Sent:*Wednesday, March 7, 2018 8:10 PM
*To:*Yan Zhou [FDS Science]
*Cc:*user@flink.apache.org <mailto:user@flink.apache.org>
*Subject:*Re: flink sql timed-window join throw "mismatched type"
AssertionError on rowtime column
Hi Yan,
I’d like to look into this. Can you share more about your queries
and the full stack trace?
Thank,
Xingcan
On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science]
<yz...@coupang.com <mailto:yz...@coupang.com>> wrote:
Hi experts,
I am using flink table api to join two tables, which are
datastream underneath. However, I got an assertion error
of"java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on
rowtime column. Below is more details:
There in only one kafka data source, which is then converted to
Table and registered. One existed column is set as rowtime(event
time) attribute. Two over-window aggregation queries are run
against the table and two tables are created as results.
Everything works great so far.
However when timed-window joining two result tables with
inherented rowtime, calcite throw the "java.lang.AssertionError:
mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let
me know what is the possible cause? F.Y.I., I rename the rowtime
column for one of the result table.
DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
Table table = tableEnv.fromDataStream(dataStream, "col1",
"col2", ...);
tableEnv.registerTable(tableName, table);
Table left = tableEnv.sqlQuery("select id,*eventTime*,count
(*) over ... from ...");
Table right = tableEnv.sqlQuery("select id as r_id,*eventTime
as r_event_time*, count (*) over ... from ...");
left.join(right).where("id = r_id && eventTime === r_event_time)
.addSink(...); // here calcite throw exception:
java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)
source table
|-- id: Long
|-- eventTime: TimeIndicatorTypeInfo(rowtime)
|-- ...
|-- ...
result_1 table
|-- id: Long
|-- eventTime: TimeIndicatorTypeInfo(rowtime)
|-- ...
|-- ...
result_2 table
|-- rid: Long
|-- r_event_time: TimeIndicatorTypeInfo(rowtime)
|-- ...
Best
Yan