Re: Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"

2020-12-17 Thread Timo Walther

Hi Dan,

are you intending to use interval joins, regular joins, or a mixture of 
both?


For regular joins you must ensure to cast a rowtime attribute to 
timestamp as early as possible. For interval joins, you need to make 
sure that the rowtime attribute is unmodified.


Currently, I see

COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS 
TIMESTAMP(3))) AS insertion_ts


or

CAST(flat_impression_view.impression_ts AS TIMESTAMP)

which disables interval joins implicitly.

If you would like to keep the interval join properties, you need to do 
the casting in a computed column during a CREATE TABLE statement. Before 
declaring a watermark for it.


Regards,
Timo



On 15.12.20 18:47, Dan Hill wrote:
When I try to refactor my joins into a temporary view to share joins and 
state, I get the following error.  I tried a few variations of the code 
snippets below (adding TIMESTAMP casts based on Google searches).  I 
removed a bunch of fields to simplify this example.


Is this a known issue?  Do I have a simple coding bug?

CREATE TEMPORARY VIEW `flat_impression_view` AS

SELECT

DATE_FORMAT(input_impression.ts, '-MM-dd') AS dt,

input_insertion.log_user_id AS insertion_log_user_id,

COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS 
TIMESTAMP(3))) AS insertion_ts,


input_insertion.insertion_id AS insertion_insertion_id,

COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS 
TIMESTAMP(3))) AS impression_ts,


input_impression.impression_id AS impression_impression_id,

input_impression.insertion_id AS impression_insertion_id,

FROM input_insertion

JOIN input_impression

ON input_insertion.insertion_id = input_impression.insertion_id

AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN 
CAST(input_impression.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND 
CAST(input_impression.ts AS TIMESTAMP) + INTERVAL '1' HOUR



INSERT INTO `flat_impression_w_click`

SELECT

dt,

insertion_log_user_id,

CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts,

insertion_insertion_id,

CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts,

impression_impression_id,

impression_insertion_id,

COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) 
AS click_ts,


COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id,

COALESCE(input_click.impression_id, EmptyByteArray()) AS 
click_impression_id,


FROM flat_impression_view

LEFT JOIN input_click

ON flat_impression_view.impression_impression_id = input_click.impression_id

AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN 
CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND 
CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR



java.lang.RuntimeException: Failed to executeSql=...

...

Caused by: org.apache.flink.table.api.TableException: Cannot generate a 
valid execution plan for the given query:


FlinkLogicalLegacySink(name=[...])

+- FlinkLogicalCalc(select=[...])

+- FlinkLogicalJoin(condition=[AND(=($36, $45), 
 >=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6), 
4320:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL, 
+(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)))], joinType=[left])


:- FlinkLogicalCalc(select=[...])

:+- FlinkLogicalJoin(condition=[AND(=($5, $35), 
 >=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 4320:INTERVAL 
HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6), 
360:INTERVAL HOUR)))], joinType=[inner])


: :- FlinkLogicalDataStreamTableScan(table=[[default, mydb, 
input_insertion]])


: +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, 
input_impression]])


+- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_click]])


Rowtime attributes must not be in the input rows of a regular join. As a 
workaround you can cast the time attributes of input tables to TIMESTAMP 
before.


Please check the documentation for the set of currently supported SQL 
features.


at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)


at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)


at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)


at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)


at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)


at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)


at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.op

Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"

2020-12-15 Thread Dan Hill
When I try to refactor my joins into a temporary view to share joins and
state, I get the following error.  I tried a few variations of the code
snippets below (adding TIMESTAMP casts based on Google searches).  I
removed a bunch of fields to simplify this example.

Is this a known issue?  Do I have a simple coding bug?

CREATE TEMPORARY VIEW `flat_impression_view` AS

SELECT

DATE_FORMAT(input_impression.ts, '-MM-dd') AS dt,

input_insertion.log_user_id AS insertion_log_user_id,

COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3)))
AS insertion_ts,

input_insertion.insertion_id AS insertion_insertion_id,

COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS
TIMESTAMP(3))) AS impression_ts,

input_impression.impression_id AS impression_impression_id,

input_impression.insertion_id AS impression_insertion_id,

FROM input_insertion

JOIN input_impression

ON input_insertion.insertion_id = input_impression.insertion_id

AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN CAST(input_impression.ts
AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_impression.ts AS
TIMESTAMP) + INTERVAL '1' HOUR


INSERT INTO `flat_impression_w_click`

SELECT

dt,

insertion_log_user_id,

CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts,

insertion_insertion_id,

CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts,

impression_impression_id,

impression_insertion_id,

COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS
click_ts,

COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id,

COALESCE(input_click.impression_id, EmptyByteArray()) AS
click_impression_id,

FROM flat_impression_view

LEFT JOIN input_click

ON flat_impression_view.impression_impression_id = input_click.impression_id


AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN
CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND
CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR


java.lang.RuntimeException: Failed to executeSql=...

...

Caused by: org.apache.flink.table.api.TableException: Cannot generate a
valid execution plan for the given query:

FlinkLogicalLegacySink(name=[...])

+- FlinkLogicalCalc(select=[...])

   +- FlinkLogicalJoin(condition=[AND(=($36, $45),
>=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6),
4320:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL,
+(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)))], joinType=[left])

  :- FlinkLogicalCalc(select=[...])

  :  +- FlinkLogicalJoin(condition=[AND(=($5, $35),
>=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 4320:INTERVAL
HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6),
360:INTERVAL HOUR)))], joinType=[inner])

  : :- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_insertion]])

  : +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_impression]])

  +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_click]])


Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.

Please check the documentation for the set of currently supported SQL
features.

at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)

at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)

at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)

at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)

at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)

at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)