Hello,
I'm seeing some strange behaviour in Flink SQL where adding a new SELECT
statement causes a previously created Interval Join to be changed into a
regular Join. I'm concerned because the Flink docs make clear that regular
Joins are not safe because their memory usage can grow indefinitely.

I have put a worked example in https://github.com/mnuttall/flink-debug. I
have an interval join,

CREATE TEMPORARY VIEW suspiciousOrders AS
    SELECT s.orderId, s.customer, s.product, s.quantity AS order_quantity,
l.cancel_quantity, l.order_ts AS large_ts, s.ts as small_ts, l.cancel_ts
    FROM smallOrders s JOIN largeCancellations l
    ON s.product = l.product AND s.customer = l.customer
    WHERE s.ts BETWEEN l.cancel_ts - interval '1' day AND l.cancel_ts;

which evaluates to

[13]:IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-86400000, leftUpperBound=0, leftTimeIndex=0,
rightTimeIndex=1], where=[((product = product0) AND (customer = customer0)
AND (ts >= (cancel_ts - 86400000:INTERVAL DAY)) AND (ts <=
cancel_ts))], select=[ts, orderId, customer, product, quantity, order_ts,
cancel_ts, product0, customer0, cancel_quantity])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
   +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
      +- Sink: Collect table sink

but adding a further temporary view

CREATE TEMPORARY VIEW filteredResults AS
    SELECT * from suspiciousOrders WHERE small_ts > large_ts;

changes the interval join to a regular join,

 [13]:Join(joinType=[InnerJoin], where=[((product = product0) AND (customer
= customer0) AND (ts >= (cancel_ts - 86400000:INTERVAL DAY)) AND (ts
<= cancel_ts) AND (ts > order_ts))], select=[ts, orderId, customer,
product, quantity, order_ts, cancel_ts, product0, customer0,
cancel_quantity], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
   +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
      +- Sink: Collect table sink

Please can someone explain what's happening here? It looks as though my
(safe) interval join is being converted to an (unsafe) regular join - is
that true?

Many thanks in advance.
Regards,

Mark Nuttall

Reply via email to