[
https://issues.apache.org/jira/browse/FLINK-39899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088235#comment-18088235
]
Chen Zhang commented on FLINK-39899:
------------------------------------
Update: the solution above will affect how WindowDeuplicate operator is working.
> Flink SQL Window TVF didn't remove rowtime attribute from original rowtime
> field
> --------------------------------------------------------------------------------
>
> Key: FLINK-39899
> URL: https://issues.apache.org/jira/browse/FLINK-39899
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.19.3, 1.20.4, 2.1.2, 2.2.1, 2.3.1
> Reporter: Chen Zhang
> Priority: Major
> Labels: pull-request-available
>
> h2. Summary
> Window Table-Valued Functions (TUMBLE/HOP/CUMULATE) do not materialize the
> original rowtime attribute column to a regular {{TIMESTAMP}} type in the
> output schema. Per FLIP-145 specification, the original time attribute should
> become a regular timestamp after applying the window TVF, with only
> {{window_time}} remaining as the rowtime attribute. Instead, both the
> original column and {{window_time}} retain the {{*ROWTIME*}} indicator.
> h2. Description
> *FLIP-145 states:*
> {quote}
> The original row time attribute "timecol" will be a regular timestamp column
> after applying window TVF.
> {quote}
> *Actual behavior:*
> After applying {{TUMBLE(TABLE t, DESCRIPTOR(ts), INTERVAL '10' SECOND)}}, the
> output schema shows:
> {code}
> `window_time` TIMESTAMP(3) NOT NULL *ROWTIME*
> `ts` TIMESTAMP(3) *ROWTIME* <-- should be regular
> TIMESTAMP(3)
> {code}
> Both {{ts}} and {{window_time}} are marked as {{*ROWTIME*}}, violating the
> FLIP-145 design.
> *Evidence from plan AST:*
> {code}
> LogicalTableFunctionScan(
> invocation=[TUMBLE(DESCRIPTOR($2), 10000:INTERVAL SECOND)],
> rowType=[RecordType(
> VARCHAR entity_id,
> VARCHAR payload,
> TIMESTAMP(3) *ROWTIME* ts, <-- STILL ROWTIME
> TIMESTAMP(3) window_start,
> TIMESTAMP(3) window_end,
> TIMESTAMP(3) *ROWTIME* window_time
> )]
> )
> {code}
> h2. Impact
> h3. 1. Silent data loss in OVER aggregation after window TVF
> Because {{ts}} retains {{*ROWTIME*}}, it passes the time-attribute validation
> in {{StreamExecOverAggregate.translateToPlanInternal()}}. The planner accepts
> queries like:
> {code:sql}
> SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED PRECEDING)
> FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))
> {code}
> At runtime, the window aggregate operator ({{SlicingWindowOperator}})
> registers timers at {{window_end - 1}} and forwards intermediate watermarks.
> When the window fires and emits records, records with {{ts}} values early in
> the window are *behind the downstream OVER operator's watermark* and are
> *silently dropped as late*.
> Example: For a window {{[12:00:00, 12:00:10)}}:
> * Intermediate watermarks (e.g., {{12:00:05}}, {{12:00:08}}) are forwarded to
> the OVER operator
> * When the window fires, records with {{ts = 12:00:01}}, {{12:00:03}},
> {{12:00:06}} are late (behind watermark {{12:00:08}})
> * Only records near the end of the window survive
> * With {{window_time = 12:00:09.999}}, all records share the same timestamp
> and none are dropped
> h3. 2. Multiple rowtime columns in output
> Having two rowtime columns causes errors when writing to sinks:
> {code}
> TableException: The query contains more than one rowtime attribute column
> [window_time, ts] for writing into table '*anonymous_datastream_sink*'.
> {code}
> This was partially worked around in FLINK-24186 by relaxing the check for
> collect/print sinks, but the root cause was never fixed.
> h2. Root Cause
> The window TVF's output type derivation (in the planner's type inference for
> {{LogicalTableFunctionScan}}) preserves the {{TimeIndicatorRelDataType}} on
> the original time column. It should materialize the original time column to a
> regular {{TIMESTAMP}} type, keeping only {{window_time}} as {{*ROWTIME*}}.
> h2. Steps to Reproduce
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> EnvironmentSettings.newInstance().inStreamingMode().build());
> // Create source with rowtime
> tableEnv.executeSql(
> "CREATE TABLE source (" +
> " id STRING, ts TIMESTAMP(3)," +
> " WATERMARK FOR ts AS ts - INTERVAL '1' SECOND" +
> ") WITH ('connector' = 'datagen')");
> // Query: TUMBLE + OVER using original ts
> Table result = tableEnv.sqlQuery(
> "SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED
> PRECEDING) AS cnt " +
> "FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))");
> // Inspect schema - ts should NOT be *ROWTIME*
> result.printSchema();
> // Actual: `ts` TIMESTAMP(3) *ROWTIME*
> // Expected: `ts` TIMESTAMP(3)
> {code}
> h2. Expected Behavior
> After window TVF, the output schema should be:
> {code}
> `ts` TIMESTAMP(3) <-- regular timestamp, NOT
> rowtime
> `window_time` TIMESTAMP(3) NOT NULL *ROWTIME* <-- sole rowtime attribute
> {code}
> The OVER aggregation {{ORDER BY ts}} should be *rejected* by the planner
> because {{ts}} is no longer a time attribute.
> h2. Related Issues
> * FLINK-24186 - Worked around the "multiple rowtime columns" symptom for
> collect/print sinks
> * FLINK-38162 - Time attribute propagation issues with SQL functions after
> window TVF
> * FLINK-10211 - Broader issue with time indicator materialization
> *
> [FLIP-145|https://cwiki.apache.org/confluence/display/FLINK/FLIP-145:+Support+SQL+windowing+table-valued+function]
> - Original design specification for window TVFs
--
This message was sent by Atlassian Jira
(v8.20.10#820010)