Re: Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"
Hi Dan, are you intending to use interval joins, regular joins, or a mixture of both? For regular joins you must ensure to cast a rowtime attribute to timestamp as early as possible. For interval joins, you need to make sure that the rowtime attribute is unmodified. Currently, I see COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS insertion_ts or CAST(flat_impression_view.impression_ts AS TIMESTAMP) which disables interval joins implicitly. If you would like to keep the interval join properties, you need to do the casting in a computed column during a CREATE TABLE statement. Before declaring a watermark for it. Regards, Timo On 15.12.20 18:47, Dan Hill wrote: When I try to refactor my joins into a temporary view to share joins and state, I get the following error. I tried a few variations of the code snippets below (adding TIMESTAMP casts based on Google searches). I removed a bunch of fields to simplify this example. Is this a known issue? Do I have a simple coding bug? CREATE TEMPORARY VIEW `flat_impression_view` AS SELECT DATE_FORMAT(input_impression.ts, '-MM-dd') AS dt, input_insertion.log_user_id AS insertion_log_user_id, COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS insertion_ts, input_insertion.insertion_id AS insertion_insertion_id, COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS impression_ts, input_impression.impression_id AS impression_impression_id, input_impression.insertion_id AS impression_insertion_id, FROM input_insertion JOIN input_impression ON input_insertion.insertion_id = input_impression.insertion_id AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN CAST(input_impression.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_impression.ts AS TIMESTAMP) + INTERVAL '1' HOUR INSERT INTO `flat_impression_w_click` SELECT dt, insertion_log_user_id, CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts, insertion_insertion_id, CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts, impression_impression_id, impression_insertion_id, COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS click_ts, COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id, COALESCE(input_click.impression_id, EmptyByteArray()) AS click_impression_id, FROM flat_impression_view LEFT JOIN input_click ON flat_impression_view.impression_impression_id = input_click.impression_id AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR java.lang.RuntimeException: Failed to executeSql=... ... Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalLegacySink(name=[...]) +- FlinkLogicalCalc(select=[...]) +- FlinkLogicalJoin(condition=[AND(=($36, $45), >=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL, +(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)))], joinType=[left]) :- FlinkLogicalCalc(select=[...]) :+- FlinkLogicalJoin(condition=[AND(=($5, $35), >=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 4320:INTERVAL HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6), 360:INTERVAL HOUR)))], joinType=[inner]) : :- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_insertion]]) : +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_impression]]) +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_click]]) Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.op
Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"
When I try to refactor my joins into a temporary view to share joins and state, I get the following error. I tried a few variations of the code snippets below (adding TIMESTAMP casts based on Google searches). I removed a bunch of fields to simplify this example. Is this a known issue? Do I have a simple coding bug? CREATE TEMPORARY VIEW `flat_impression_view` AS SELECT DATE_FORMAT(input_impression.ts, '-MM-dd') AS dt, input_insertion.log_user_id AS insertion_log_user_id, COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS insertion_ts, input_insertion.insertion_id AS insertion_insertion_id, COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS impression_ts, input_impression.impression_id AS impression_impression_id, input_impression.insertion_id AS impression_insertion_id, FROM input_insertion JOIN input_impression ON input_insertion.insertion_id = input_impression.insertion_id AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN CAST(input_impression.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_impression.ts AS TIMESTAMP) + INTERVAL '1' HOUR INSERT INTO `flat_impression_w_click` SELECT dt, insertion_log_user_id, CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts, insertion_insertion_id, CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts, impression_impression_id, impression_insertion_id, COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS click_ts, COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id, COALESCE(input_click.impression_id, EmptyByteArray()) AS click_impression_id, FROM flat_impression_view LEFT JOIN input_click ON flat_impression_view.impression_impression_id = input_click.impression_id AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR java.lang.RuntimeException: Failed to executeSql=... ... Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalLegacySink(name=[...]) +- FlinkLogicalCalc(select=[...]) +- FlinkLogicalJoin(condition=[AND(=($36, $45), >=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL, +(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)))], joinType=[left]) :- FlinkLogicalCalc(select=[...]) : +- FlinkLogicalJoin(condition=[AND(=($5, $35), >=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 4320:INTERVAL HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6), 360:INTERVAL HOUR)))], joinType=[inner]) : :- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_insertion]]) : +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_impression]]) +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_click]]) Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)