Caizhi Weng created FLINK-25357:
-----------------------------------

             Summary: SQL planner incorrectly changes a streaming join with 
FLOOR(rowtime) into interval join
                 Key: FLINK-25357
                 URL: https://issues.apache.org/jira/browse/FLINK-25357
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.14.2
            Reporter: Caizhi Weng


This issue is reported from the [user mailing 
list|https://lists.apache.org/thread/v8omhomp58hb8m5dj4noxbr1dsyy6zjl].

Add the following test case to {{TableEnvironmentITCase}} to reproduce this 
issue.
{code:scala}
@Test
def myTest(): Unit = {
  val data = Seq(
    Row.of(
      "1",
      java.time.LocalDateTime.of(2021, 12, 13, 12, 5, 8)
    ),
    Row.of(
      "1",
      java.time.LocalDateTime.of(2021, 12, 13, 13, 5, 4)
    ),
    Row.of(
      "1",
      java.time.LocalDateTime.of(2021, 12, 13, 14, 5, 6)
    )
  )

  tEnv.executeSql(
    s"""
       |create table T (
       |  id STRING,
       |  b TIMESTAMP(3),
       |  WATERMARK FOR b AS b - INTERVAL '60' MINUTES
       |) WITH (
       |  'connector' = 'values',
       |  'bounded' = 'true',
       |  'data-id' = '${TestValuesTableFactory.registerData(data)}'
       |)
       |""".stripMargin)
  tEnv.executeSql(
    """
      |SELECT
      |  source.id AS sourceid,
      |  CAST(source.b AS TIMESTAMP) AS source_startat,
      |  CAST(target.b AS TIMESTAMP) AS target_startat
      |FROM T source, T target
      |WHERE source.id = target.id
      |AND source.id IN ('1', '2', '3')
      |AND source.b >= FLOOR(target.b TO HOUR) + INTERVAL '1' HOUR AND source.b 
< FLOOR(target.b TO HOUR) + INTERVAL '2' HOUR
      |""".stripMargin).print()
}
{code}

Results (correct) for the batch task is
{code}
+--------------------------------+----------------------------+----------------------------+
|                       sourceid |             source_startat |             
target_startat |
+--------------------------------+----------------------------+----------------------------+
|                              1 | 2021-12-13 13:05:04.000000 | 2021-12-13 
12:05:08.000000 |
|                              1 | 2021-12-13 14:05:06.000000 | 2021-12-13 
13:05:04.000000 |
+--------------------------------+----------------------------+----------------------------+
{code}

Results (incorrect) for the streaming task is
{code}
+----+--------------------------------+----------------------------+----------------------------+
| op |                       sourceid |             source_startat |            
 target_startat |
+----+--------------------------------+----------------------------+----------------------------+
| +I |                              1 | 2021-12-13 14:05:06.000000 | 2021-12-13 
12:05:08.000000 |
| +I |                              1 | 2021-12-13 14:05:06.000000 | 2021-12-13 
13:05:04.000000 |
+----+--------------------------------+----------------------------+----------------------------+
{code}

Plan for the streaming task is
{code}
LogicalProject(sourceid=[$0], source_startat=[CAST($1):TIMESTAMP(6)], 
target_startat=[CAST($3):TIMESTAMP(6)])
+- LogicalFilter(condition=[AND(=($0, $2), OR(=($0, _UTF-16LE'1'), =($0, 
_UTF-16LE'2'), =($0, _UTF-16LE'3')), >=($1, +(FLOOR($3, FLAG(HOUR)), 
3600000:INTERVAL HOUR)), <($1, +(FLOOR($3, FLAG(HOUR)), 7200000:INTERVAL 
HOUR)))])
   +- LogicalJoin(condition=[true], joinType=[inner])
      :- LogicalWatermarkAssigner(rowtime=[b], watermark=[-($1, 
3600000:INTERVAL MINUTE)])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, T]])
      +- LogicalWatermarkAssigner(rowtime=[b], watermark=[-($1, 
3600000:INTERVAL MINUTE)])
         +- LogicalTableScan(table=[[default_catalog, default_database, T]])

== Optimized Physical Plan ==
Calc(select=[id AS sourceid, CAST(CAST(b)) AS source_startat, CAST(CAST(b0)) AS 
target_startat])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=3600000, leftUpperBound=7199999, leftTimeIndex=1, 
rightTimeIndex=1], where=[AND(=(id, id0), >=(b, +(FLOOR(b0, FLAG(HOUR)), 
3600000:INTERVAL HOUR)), <(b, +(FLOOR(b0, FLAG(HOUR)), 7200000:INTERVAL 
HOUR)))], select=[id, b, id0, b0])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, b], where=[SEARCH(id, Sarg[_UTF-16LE'1', 
_UTF-16LE'2', _UTF-16LE'3']:CHAR(1) CHARACTER SET "UTF-16LE")])
   :     +- WatermarkAssigner(rowtime=[b], watermark=[-(b, 3600000:INTERVAL 
MINUTE)])
   :        +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[id, b])
   +- Exchange(distribution=[hash[id]])
      +- WatermarkAssigner(rowtime=[b], watermark=[-(b, 3600000:INTERVAL 
MINUTE)])
         +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[id, b])

== Optimized Execution Plan ==
Calc(select=[id AS sourceid, CAST(CAST(b)) AS source_startat, CAST(CAST(b0)) AS 
target_startat])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=3600000, leftUpperBound=7199999, leftTimeIndex=1, 
rightTimeIndex=1], where=[((id = id0) AND (b >= (FLOOR(b0, FLAG(HOUR)) + 
3600000:INTERVAL HOUR)) AND (b < (FLOOR(b0, FLAG(HOUR)) + 7200000:INTERVAL 
HOUR)))], select=[id, b, id0, b0])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, b], where=[SEARCH(id, Sarg[_UTF-16LE'1', 
_UTF-16LE'2', _UTF-16LE'3']:CHAR(1) CHARACTER SET "UTF-16LE")])
   :     +- WatermarkAssigner(rowtime=[b], watermark=[(b - 3600000:INTERVAL 
MINUTE)])(reuse_id=[1])
   :        +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[id, b])
   +- Exchange(distribution=[hash[id]])
      +- Reused(reference_id=[1])
{code}

You can see that the planner incorrectly changes this join to an interval join. 
The generated condition for the interval join is also incorrect, which causes 
the 1st line of the streaming result to be produced.
{code:java}
public class ConditionFunction$171
        extends org.apache.flink.api.common.functions.AbstractRichFunction
        implements org.apache.flink.table.runtime.generated.JoinCondition {

    public ConditionFunction$171(Object[] references) throws Exception {}

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) 
throws Exception {}

    @Override
    public boolean apply(
            org.apache.flink.table.data.RowData in1, 
org.apache.flink.table.data.RowData in2)
            throws Exception {

        return true;
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to