lincoln lee created FLINK-30598:
-----------------------------------
Summary: Wrong code generated for WatermarkGenerator because of
inconsistent source type info when deserialized from exec plan
Key: FLINK-30598
URL: https://issues.apache.org/jira/browse/FLINK-30598
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.16.0
Reporter: lincoln lee
Assignee: lincoln lee
Fix For: 1.17.0, 1.16.1
When compile from an exist exec plan which contains watermark declaration and
it referred the metadata column, the generated code for WatermarkGenerator
maybe wrong
because currently `DynamicTableSourceSpec`.getTableSource passes the user
defined schema to `SourceAbilitySpec` to perform optimization like
projection/watermark pushdown, while optimization path from sql use a fixed
reorder form: "PHYSICAL COLUMNS + METADATA COLUMNS", this may cause the problem.
a repro-case:
{code}
@Test
public void testWatermarkPushDownWithMetadata() throws Exception {
// to verify FLINK-: the case declares metadata field first, without
fix it will get a
// wrong code generated by WatermarkGeneratorCodeGenerator which
reference the incorrect
// varchar column as the watermark field.
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
new String[] {
"ts timestamp(3) metadata",
"a int",
"b bigint",
"c varchar",
"watermark for ts as ts - interval '5' second"
},
new HashMap<String, String>() {
{
put("enable-watermark-push-down", "true");
put("readable-metadata", "ts:timestamp(3)");
}
});
File sinkPath =
createTestCsvSinkTable(
"MySink", "a int", "b bigint", "c varchar", "ts
timestamp(3)");
compileSqlAndExecutePlan("insert into MySink select a, b, c, ts from
MyTable where b = 3")
.await();
assertResult(
Arrays.asList(
"4,3,Hello world, how are you?," +
toLocalDateTime(4000L),
"5,3,I am fine.," + toLocalDateTime(5000L),
"6,3,Luke Skywalker," + toLocalDateTime(6000L)),
sinkPath);
}
{code}
the wrong code snippet(`row.getString(3)` should be a TimestampData):
{code}
public Long currentWatermark(org.apache.flink.table.data.RowData row) throws
Exception {
org.apache.flink.table.data.binary.BinaryStringData field$19;
boolean isNull$19;
org.apache.flink.table.data.binary.BinaryStringData field$21;
boolean isNull$22;
org.apache.flink.table.data.TimestampData result$23;
boolean isNull$24;
org.apache.flink.table.data.TimestampData result$25;
isNull$19 = row.isNullAt(3);
field$19 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$19) {
field$19 = ((org.apache.flink.table.data.binary.BinaryStringData)
row.getString(3));
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)