lincoln lee created FLINK-30598:
-----------------------------------

             Summary: Wrong code generated for WatermarkGenerator because of 
inconsistent source type info when deserialized from exec plan
                 Key: FLINK-30598
                 URL: https://issues.apache.org/jira/browse/FLINK-30598
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.16.0
            Reporter: lincoln lee
            Assignee: lincoln lee
             Fix For: 1.17.0, 1.16.1


When compile from an exist exec plan which contains watermark declaration and 
it referred the metadata column, the generated code for WatermarkGenerator 
maybe wrong

because currently `DynamicTableSourceSpec`.getTableSource passes the user 
defined schema to `SourceAbilitySpec` to perform optimization like 
projection/watermark pushdown, while optimization path from sql use a fixed 
reorder form: "PHYSICAL COLUMNS + METADATA COLUMNS", this may cause the problem.

a repro-case:

{code}
@Test
    public void testWatermarkPushDownWithMetadata() throws Exception {
        // to verify FLINK-: the case declares metadata field first, without 
fix it will get a
        // wrong code generated by WatermarkGeneratorCodeGenerator which 
reference the incorrect
        // varchar column as the watermark field.
        createTestValuesSourceTable(
                "MyTable",
                JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
                new String[] {
                    "ts timestamp(3) metadata",
                    "a int",
                    "b bigint",
                    "c varchar",
                    "watermark for ts as ts - interval '5' second"
                },
                new HashMap<String, String>() {
                    {
                        put("enable-watermark-push-down", "true");
                        put("readable-metadata", "ts:timestamp(3)");
                    }
                });

        File sinkPath =
                createTestCsvSinkTable(
                        "MySink", "a int", "b bigint", "c varchar", "ts 
timestamp(3)");

        compileSqlAndExecutePlan("insert into MySink select a, b, c, ts from 
MyTable where b = 3")
                .await();

        assertResult(
                Arrays.asList(
                        "4,3,Hello world, how are you?," + 
toLocalDateTime(4000L),
                        "5,3,I am fine.," + toLocalDateTime(5000L),
                        "6,3,Luke Skywalker," + toLocalDateTime(6000L)),
                sinkPath);
    }
{code}

the wrong code snippet(`row.getString(3)` should be a TimestampData):
{code}
public Long currentWatermark(org.apache.flink.table.data.RowData row) throws 
Exception {
          
          org.apache.flink.table.data.binary.BinaryStringData field$19;
          boolean isNull$19;
          org.apache.flink.table.data.binary.BinaryStringData field$21;
          boolean isNull$22;
          org.apache.flink.table.data.TimestampData result$23;
          boolean isNull$24;
          org.apache.flink.table.data.TimestampData result$25;
          
          isNull$19 = row.isNullAt(3);
          field$19 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$19) {
            field$19 = ((org.apache.flink.table.data.binary.BinaryStringData) 
row.getString(3));
          }
{code}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to