Lilo created FLINK-36930:
----------------------------
Summary: Lookup join with JDBC connector fails when join condition
does not include fields from the probe/left table
Key: FLINK-36930
URL: https://issues.apache.org/jira/browse/FLINK-36930
Project: Flink
Issue Type: Bug
Affects Versions: 1.19.1, 1.18.1, 1.17.0
Environment: * Flink versions affected: 1.17.0 and later versions
(Tested on 1.19.1)
* Flink version where this worked: 1.16.2
* Connector: JDBC (tested with Apache Derby and MySQL ; likely reproducible
with other JDBC drivers). Notably, this issue does not affect the Hive
connector.
Reporter: Lilo
I've encountered a regression in Flink 1.17.0 (and later versions) related to
lookup joins when using the JDBC connector as the lookup source. The issue
arises when the join condition does not include fields from the probe/left
table. This scenario worked correctly in Flink 1.16.2 but now throws a
`TableException`.
The following SQL code demonstrates the problem.
{code:sql}
SET 'execution.target' = 'local';
-- Create the lookup table (using Apache Derby as an example)
DROP TEMPORARY TABLE IF EXISTS lookup_table;
CREATE TEMPORARY TABLE lookup_table (
id INT,
sub_id INT,
v STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myInMemoryDB;create=true',
'table-name' = 'lookup_table'
);
-- Create the main table (using datagen connector)
DROP TEMPORARY TABLE IF EXISTS main_table;
CREATE TEMPORARY TABLE main_table (
v STRING,
some_id INT,
some_sub_id INT,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.some_id.kind' = 'random',
'fields.some_id.min' = '1',
'fields.some_id.max' = '5',
'fields.some_sub_id.kind' = 'random',
'fields.some_sub_id.min' = '1',
'fields.some_sub_id.max' = '3',
'fields.v.length' = '10'
);
-- This lookup join works correctly (join condition includes a field from
main_table)
EXPLAIN PLAN FOR
SELECT
t1.*,
t2.v AS lookup_value
FROM main_table t1
INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t2.id = 1
AND t2.sub_id = t1.some_sub_id;
-- This lookup join FAILS in Flink 1.17.0+ (join condition only uses constants)
EXPLAIN PLAN FOR
SELECT
t1.*,
t2.v AS lookup_value
FROM main_table t1
INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t2.id = 1
AND t2.sub_id = 1;
-- Attempting to bypass the issue with a view also fails
DROP VIEW IF EXISTS main_table_view;
CREATE TEMPORARY VIEW main_table_view AS SELECT *, 1 AS fake_id FROM main_table;
EXPLAIN PLAN FOR
SELECT
t1.*,
t2.v AS lookup_value
FROM main_table_view t1
INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t2.id = 1
AND t2.sub_id = t1.fake_id;
{code}
*Expected Behavior:*
These queries should execute successfully, even when the join condition does
not involve fields from the main table. This was the behavior in Flink 1.16.2.
*Actual Behavior:*
In Flink 1.17.0 and later, the second query (where the join condition only uses
constants) throws the following exception (from Flink 1.17.2):
{code:java}
Caused by: org.apache.flink.table.api.TableException: Temporal table join
requires an equality condition on fields of table
[default_catalog.default_database.lookup_table].
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validate(CommonExecLookupJoin.java:687)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:249)
at
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:157)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)