[ 
https://issues.apache.org/jira/browse/FLINK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nic Townsend updated FLINK-38162:
---------------------------------
    Description: 
In the Flink docs for window join - 
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
 - it uses \{COALESCE} in a FULL JOIN to ensure the row has a value for 
\{window_start} and \{window_end}.

However, the example omits \{window_time} from the result - which is needed for 
downstream re-windowing, or cascading windows 
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).]

An example use case might be "I have a two phased commit process - and I want 
to know how many transactions in a window were just LEFT, how many were RIGHT, 
and how many were COMPLETE". It possibly is very niche - the problem is that 
adding in \{window_time} causes a series of different errors to occur with 
downstream operators.
h4. Problem 1:

If you use {{COALESCE}} for \{window_time}, a {{DESCRIBE}} will show the column 
as being \{ROWTIME}:
{code:java}
+----------+------------------------+------+-----+--------+----------------------------------+
|     name |                   type | null | key | extras |                     
   watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| window_time | TIMESTAMP(3) *ROWTIME* | true |     |        | | {code}
But - if you use a Windowing TVF on the results of the join, Flink throws an 
error that {{window_time}} is not a time attribute.

This is (I assume) due to the use of \{COALESCE} : 
[https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
 says:

> As long as a time attribute is not modified, and is simply forwarded from one 
> part of a query to another, it remains a valid time attribute. Time 
> attributes behave like regular timestamps, and are accessible for 
> calculations. When used in calculations, time attributes are materialized and 
> act as standard timestamps. However, ordinary timestamps cannot be used in 
> place of, or be converted to, time attributes.
h4. Problem 2:

If you try to use cascading windows and perform window aggregation after the 
join, you do not get a windowed aggregate. Instead, the planner will create an 
unbounded aggregate. As with problem 1, the assumption based off 
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)]
 is that the time attribute has not propagated, so the window does not cascade.

Problem 3:
{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: 
RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3) 
agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) ROWTIME 
agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference: 
agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) ROWTIME 
{code}

  was:
In the Flink docs for window join - 
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
 - the outer join example does not include the use of {{window_time.}}

This means that you cannot perform a window aggregate for example on the result 
of the join - perhaps for example you want to count how many rows came from the 
left, right, or both in a given window.

 

If you use the {{COALESCE}} like in the example, then although a {{DESCRIBE}} 
will show
{code:java}
+----------+------------------------+------+-----+--------+----------------------------------+
|     name |                   type | null | key | extras |                     
   watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| window_time | TIMESTAMP(3) *ROWTIME* | true |     |        | | {code}
it will not work with a Window TVF and throws an error that {{window_time}} is 
not a time attribute. This is (I assume) due to: 
[https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
 where it says:

> As long as a time attribute is not modified, and is simply forwarded from one 
> part of a query to another, it remains a valid time attribute. Time 
> attributes behave like regular timestamps, and are accessible for 
> calculations. When used in calculations, time attributes are materialized and 
> act as standard timestamps. However, ordinary timestamps cannot be used in 
> place of, or be converted to, time attributes.

 

Even worse - if you build the following statement set (pseduo):
 * CREATE left table
 * CREATE right table
 * CREATE temporary view joined <WINDOW OUTER JOIN of left,right>
 * CREATE temporary view aggregate <WINDOW AGGREGATE of joined>
 * CREATE blackhole joined_sink <schema matches joined>
 * CREATE blackhole aggregate_sink <schema matches aggregate>
 * INSERT into joined_sink
 * INSERT into aggregate_sink

 

Then you end up with a stack trace from the planner of:
{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: 
RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3) 
agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) ROWTIME 
agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference: 
agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) ROWTIME 
{code}


> Use of SQL functions with time attributes causes downstream temporal 
> functions to fail
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-38162
>                 URL: https://issues.apache.org/jira/browse/FLINK-38162
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0, 2.1.0
>            Reporter: Nic Townsend
>            Priority: Minor
>         Attachments: SQLSubmitter.java
>
>
> In the Flink docs for window join - 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
>  - it uses \{COALESCE} in a FULL JOIN to ensure the row has a value for 
> \{window_start} and \{window_end}.
> However, the example omits \{window_time} from the result - which is needed 
> for downstream re-windowing, or cascading windows 
> ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).]
> An example use case might be "I have a two phased commit process - and I want 
> to know how many transactions in a window were just LEFT, how many were 
> RIGHT, and how many were COMPLETE". It possibly is very niche - the problem 
> is that adding in \{window_time} causes a series of different errors to occur 
> with downstream operators.
> h4. Problem 1:
> If you use {{COALESCE}} for \{window_time}, a {{DESCRIBE}} will show the 
> column as being \{ROWTIME}:
> {code:java}
> +----------+------------------------+------+-----+--------+----------------------------------+
> |     name |                   type | null | key | extras |                   
>      watermark |
> +----------+------------------------+------+-----+--------+----------------------------------+
> | window_time | TIMESTAMP(3) *ROWTIME* | true |     |        | | {code}
> But - if you use a Windowing TVF on the results of the join, Flink throws an 
> error that {{window_time}} is not a time attribute.
> This is (I assume) due to the use of \{COALESCE} : 
> [https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
>  says:
> > As long as a time attribute is not modified, and is simply forwarded from 
> > one part of a query to another, it remains a valid time attribute. Time 
> > attributes behave like regular timestamps, and are accessible for 
> > calculations. When used in calculations, time attributes are materialized 
> > and act as standard timestamps. However, ordinary timestamps cannot be used 
> > in place of, or be converted to, time attributes.
> h4. Problem 2:
> If you try to use cascading windows and perform window aggregation after the 
> join, you do not get a windowed aggregate. Instead, the planner will create 
> an unbounded aggregate. As with problem 1, the assumption based off 
> ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)]
>  is that the time attribute has not propagated, so the window does not 
> cascade.
> Problem 3:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: 
> RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end, 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3) 
> agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) ROWTIME 
> agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference: 
> agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) 
> ROWTIME {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to