Hello, I'm seeing some strange behaviour in Flink SQL where adding a new SELECT statement causes a previously created Interval Join to be changed into a regular Join. I'm concerned because the Flink docs make clear that regular Joins are not safe because their memory usage can grow indefinitely.
I have put a worked example in https://github.com/mnuttall/flink-debug. I have an interval join, CREATE TEMPORARY VIEW suspiciousOrders AS SELECT s.orderId, s.customer, s.product, s.quantity AS order_quantity, l.cancel_quantity, l.order_ts AS large_ts, s.ts as small_ts, l.cancel_ts FROM smallOrders s JOIN largeCancellations l ON s.product = l.product AND s.customer = l.customer WHERE s.ts BETWEEN l.cancel_ts - interval '1' day AND l.cancel_ts; which evaluates to [13]:IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-86400000, leftUpperBound=0, leftTimeIndex=0, rightTimeIndex=1], where=[((product = product0) AND (customer = customer0) AND (ts >= (cancel_ts - 86400000:INTERVAL DAY)) AND (ts <= cancel_ts))], select=[ts, orderId, customer, product, quantity, order_ts, cancel_ts, product0, customer0, cancel_quantity]) +- [14]:Calc(select=[orderId, customer, product, quantity AS order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts, cancel_ts]) +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity, cancel_quantity])] +- Sink: Collect table sink but adding a further temporary view CREATE TEMPORARY VIEW filteredResults AS SELECT * from suspiciousOrders WHERE small_ts > large_ts; changes the interval join to a regular join, [13]:Join(joinType=[InnerJoin], where=[((product = product0) AND (customer = customer0) AND (ts >= (cancel_ts - 86400000:INTERVAL DAY)) AND (ts <= cancel_ts) AND (ts > order_ts))], select=[ts, orderId, customer, product, quantity, order_ts, cancel_ts, product0, customer0, cancel_quantity], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +- [14]:Calc(select=[orderId, customer, product, quantity AS order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts, cancel_ts]) +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity, cancel_quantity])] +- Sink: Collect table sink Please can someone explain what's happening here? It looks as though my (safe) interval join is being converted to an (unsafe) regular join - is that true? Many thanks in advance. Regards, Mark Nuttall