Hi,
I am going a Flink lookup join with view based on a Apache Flink Kafka table
and a GetInData Flink HTTP connector table. The lookup is successful for many
types of lookup keys. It is not successful for arrays of primitives or booleans
when a value it specified in a view.
For example
CREATE TEMPORARY VIEW orders_view AS
SELECT
*,
CAST(ARRAY['red','green'] AS ARRAY<STRING>) AS `stringArray`,
PROCTIME() AS `proc_time`
FROM
orders;
And a table for the http connector
CREATE TEMPORARY TABLE api_table_array
(
customerId STRING,
str1 STRING,
int1 INTEGER, arr1 array<string>
Then I issue a lookup join:
SELECT * FROM orders_view AS o JOIN api_table_array FOR SYSTEM_TIME AS OF
o.proc_time as a ON o.const_requestBody_stringArray = a.arr1;
This fails with
org.apache.flink.table.api.TableException: Temporal table join requires an
equality condition on fields of table
[default_catalog.default_database.api_table_array].
The FlinkFilterJoin rule issues issue a joinRel.analyzeCondition() [1] , this
ends up in Calcite code sets the joinInfo to have no leftKeys or rightKeys and
has a nonEquiConditions of:
=(CAST(ARRAY(_UTF-16LE'red':VARCHAR(5) CHARACTER SET "UTF-16LE",
_UTF-16LE'green':VARCHAR(5) CHARACTER SET "UTF-16LE")):VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" ARRAY NOT NULL, $9)
It looks like because this code has identified a non equality condition, the
Flink lookup join cannot proceed as it needs an equals condition.
If I define the array as a column in a file or a Kafka table the join works. It
is only when I define the array literal in the view it fails. It also fails for
Boolean. All the other primitive types (Sting , int etc) work.
I am trying to find the code that causes this and why, as all the other types
work. We are hoping to make this work for Booleans and arrays. I am happy to
contribute a change with a pointer as to what might need changing.
I am recreating this with Flink 1.20.1 which is using Calcite 1.32. I see that
the non equality condition is set [2] , breakpointing on this line I see the
condition object has:
op=”=”
operands 0 is
CAST(ARRAY(_UTF-16LE'red':VARCHAR(5) CHARACTER SET "UTF-16LE",
_UTF-16LE'green':VARCHAR(5) CHARACTER SET "UTF-16LE")):VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" ARRAY NOT NULL
Operands 1 ls
$9
In goes through this code a few times, in the failing case the debugger does
not resolve op0 or op1, and ends up at [2].
Any thoughts on why this is happening and how to resolve this would be greatly
appreciated. I am happy to code a resolution but would need pointers,
Kind regards, David.
[1]
https://github.com/apache/flink/blob/8616d6d811e73979328607db03028ae0220d8491/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java#L349
[2]
https://github.com/apache/calcite/blob/597b1fd54fe5b8586525aed2bc4518ca54a25523/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java#L1709
Unless otherwise stated above:
IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road,
Winchester, Hampshire SO21 2JN