Caizhi Weng created FLINK-22730:
-----------------------------------

             Summary: Lookup join condition with CURRENT_DATE fails to filter 
records
                 Key: FLINK-22730
                 URL: https://issues.apache.org/jira/browse/FLINK-22730
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.13.0, 1.12.0
            Reporter: Caizhi Weng


Add the following test case to 
org.apache.flink.table.api.TableEnvironmentITCase to reproduce this bug.

{code:scala}
@Test
def myTest(): Unit = {
  val id1 = TestValuesTableFactory.registerData(
    Seq(Row.of("abc", LocalDateTime.of(2000, 1, 1, 0, 0))))
  val ddl1 =
    s"""
       |CREATE TABLE Ta (
       |  id VARCHAR,
       |  ts TIMESTAMP,
       |  proc AS PROCTIME()
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$id1',
       |  'bounded' = 'true'
       |)
       |""".stripMargin
  tEnv.executeSql(ddl1)

  val id2 = TestValuesTableFactory.registerData(
    Seq(Row.of("abc", LocalDateTime.of(2000, 1, 2, 0, 0))))
  val ddl2 =
    s"""
       |CREATE TABLE Tb (
       |  id VARCHAR,
       |  ts TIMESTAMP
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$id2',
       |  'bounded' = 'true'
       |)
       |""".stripMargin
  tEnv.executeSql(ddl2)

  val it = tEnv.executeSql(
    """
      |SELECT * FROM Ta AS t1
      |INNER JOIN Tb FOR SYSTEM_TIME AS OF t1.proc AS t2
      |ON t1.id = t2.id
      |WHERE CAST(coalesce(t1.ts, t2.ts) AS VARCHAR) >= 
CONCAT(CAST(CURRENT_DATE AS VARCHAR), ' 00:00:00')
      |""".stripMargin).collect()

  while (it.hasNext) {
    System.out.println(it.next())
  }
}
{code}

The result is
{code}
+I[abc, 2000-01-01T00:00, 2021-05-20T14:30:47.735Z, abc, 2000-01-02T00:00]
{code}

which is obviously incorrect.

The generated operator is as follows

{code:java}
public class JoinTableFuncCollector$22 extends 
org.apache.flink.table.runtime.collector.TableFunctionCollector {

    org.apache.flink.table.data.GenericRowData out = new 
org.apache.flink.table.data.GenericRowData(2);
    org.apache.flink.table.data.utils.JoinedRowData joinedRow$9 = new 
org.apache.flink.table.data.utils.JoinedRowData();
    private static final java.util.TimeZone timeZone =
            java.util.TimeZone.getTimeZone("Asia/Shanghai");
    private org.apache.flink.table.data.TimestampData timestamp;
    private org.apache.flink.table.data.TimestampData localTimestamp;
    private int date;

    private final org.apache.flink.table.data.binary.BinaryStringData str$17 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(" 00:00:00");


    public JoinTableFuncCollector$22(Object[] references) throws Exception {

    }

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) 
throws Exception {

    }

    @Override
    public void collect(Object record) throws Exception {
        org.apache.flink.table.data.RowData in1 = 
(org.apache.flink.table.data.RowData) getInput();
        org.apache.flink.table.data.RowData in2 = 
(org.apache.flink.table.data.RowData) record;
        org.apache.flink.table.data.binary.BinaryStringData field$7;
        boolean isNull$7;
        org.apache.flink.table.data.TimestampData field$8;
        boolean isNull$8;
        org.apache.flink.table.data.TimestampData field$10;
        boolean isNull$10;
        boolean isNull$13;
        org.apache.flink.table.data.binary.BinaryStringData result$14;
        boolean isNull$15;
        org.apache.flink.table.data.binary.BinaryStringData result$16;
        boolean isNull$18;
        org.apache.flink.table.data.binary.BinaryStringData result$19;
        boolean isNull$20;
        boolean result$21;
        isNull$8 = in2.isNullAt(1);
        field$8 = null;
        if (!isNull$8) {
            field$8 = in2.getTimestamp(1, 6);
        }
        isNull$7 = in2.isNullAt(0);
        field$7 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
        if (!isNull$7) {
            field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) 
in2.getString(0));
        }
        isNull$10 = in1.isNullAt(1);
        field$10 = null;
        if (!isNull$10) {
            field$10 = in1.getTimestamp(1, 6);
        }



        boolean result$11 = !isNull$10;
        org.apache.flink.table.data.TimestampData result$12 = null;
        boolean isNull$12;
        if (result$11) {

            isNull$12 = isNull$10;
            if (!isNull$12) {
                result$12 = field$10;
            }
        }
        else {

            isNull$12 = isNull$8;
            if (!isNull$12) {
                result$12 = field$8;
            }
        }
        isNull$13 = isNull$12;
        result$14 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
        if (!isNull$13) {

            result$14 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToString(result$12,
 6));
            isNull$13 = (result$14 == null);
        }




        isNull$15 = false;
        result$16 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
        if (!isNull$15) {

            result$16 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString(((int)
 date)));
            isNull$15 = (result$16 == null);
        }


        result$19 = 
org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$15 ) ? 
null : (result$16), ( false ) ? null : 
(((org.apache.flink.table.data.binary.BinaryStringData) str$17)));
        isNull$18 = (result$19 == null);
        if (isNull$18) {
            result$19 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
        }

        isNull$20 = isNull$13 || isNull$18;
        result$21 = false;
        if (!isNull$20) {

            result$21 = ((result$14 == null) ? ((result$19 == null) ? 0 : -1) : 
((result$19 == null) ? 1 : (result$14.compareTo(result$19)))) >= 0;

        }

        if (result$21) {





            if (isNull$7) {
                out.setField(0, null);
            } else {
                out.setField(0, field$7);
            }



            if (isNull$8) {
                out.setField(1, null);
            } else {
                out.setField(1, field$8);
            }


            joinedRow$9.replace(in1, out);
            joinedRow$9.setRowKind(in1.getRowKind());
            outputResult(joinedRow$9);

        }

    }

    @Override
    public void close() throws Exception {

    }
}
{code}

The member variable {{date}} is not initialized before use, thus causing this 
bug.

This is because 
{{LookupJoinCodeGenerator#generateTableFunctionCollectorForJoinTable}} forgets 
to use {{${ctx.reusePerRecordCode()}}} when generating {{collect}} method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to