[
https://issues.apache.org/jira/browse/FLINK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nic Townsend updated FLINK-38162:
---------------------------------
Description:
In the Flink docs for window join -
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
- the outer join example does not include the use of {{window_time.}}
This means that you cannot perform a window aggregate for example on the result
of the join - perhaps for example you want to count how many rows came from the
left, right, or both in a given window.
If you use the {{COALESCE}} like in the example, then although a {{DESCRIBE}}
will show
{code:java}
+----------+------------------------+------+-----+--------+----------------------------------+
| name | type | null | key | extras |
watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code}
it will not work with a Window TVF and throws an error that {{window_time}} is
not a time attribute. This is (I assume) due to:
[https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
where it says:
> As long as a time attribute is not modified, and is simply forwarded from one
> part of a query to another, it remains a valid time attribute. Time
> attributes behave like regular timestamps, and are accessible for
> calculations. When used in calculations, time attributes are materialized and
> act as standard timestamps. However, ordinary timestamps cannot be used in
> place of, or be converted to, time attributes.
Even worse - if you build the following statement set (pseduo):
* CREATE left table
* CREATE right table
* CREATE temporary view joined <WINDOW OUTER JOIN of left,right>
* CREATE temporary view aggregate <WINDOW AGGREGATE of joined>
* CREATE blackhole joined_sink <schema matches joined>
* CREATE blackhole aggregate_sink <schema matches aggregate>
* INSERT into joined_sink
* INSERT into aggregate_sink
Then you end up with a stack trace from the planner of:
{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype:
RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end,
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3)
agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) ROWTIME
agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference:
agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) ROWTIME
{code}
was:
In the Flink docs for window join -
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
- the outer join example does not include the use of {{window_time.}}
This means that you cannot perform a window aggregate for example on the result
of the join - perhaps for example you want to count how many rows came from the
left, right, or both in a given window.
If you use the {{COALESCE}} like in the example, then although a {{DESCRIBE}}
will show
{code:java}
+----------+------------------------+------+-----+--------+----------------------------------+
| name | type | null | key | extras |
watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code}
it will not work with a Window TVF and throws an error that {{window_time}} is
not a time attribute.
Even worse - if you build the following statement set (pseduo):
* CREATE left table
* CREATE right table
* CREATE temporary view joined <WINDOW OUTER JOIN of left,right>
* CREATE temporary view aggregate <WINDOW AGGREGATE of joined>
* CREATE blackhole joined_sink <schema matches joined>
* CREATE blackhole aggregate_sink <schema matches aggregate>
* INSERT into joined_sink
* INSERT into aggregate_sink
Then you end up with a stack trace from the planner of:
{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype:
RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end,
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3)
agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) ROWTIME
agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference:
agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) ROWTIME
{code}
> Use of SQL functions with time attributes causes downstream temporal
> functions to fail
> --------------------------------------------------------------------------------------
>
> Key: FLINK-38162
> URL: https://issues.apache.org/jira/browse/FLINK-38162
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0
> Reporter: Nic Townsend
> Priority: Minor
>
> In the Flink docs for window join -
> [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
> - the outer join example does not include the use of {{window_time.}}
> This means that you cannot perform a window aggregate for example on the
> result of the join - perhaps for example you want to count how many rows came
> from the left, right, or both in a given window.
>
> If you use the {{COALESCE}} like in the example, then although a {{DESCRIBE}}
> will show
> {code:java}
> +----------+------------------------+------+-----+--------+----------------------------------+
> | name | type | null | key | extras |
> watermark |
> +----------+------------------------+------+-----+--------+----------------------------------+
> | window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code}
> it will not work with a Window TVF and throws an error that {{window_time}}
> is not a time attribute. This is (I assume) due to:
> [https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
> where it says:
> > As long as a time attribute is not modified, and is simply forwarded from
> > one part of a query to another, it remains a valid time attribute. Time
> > attributes behave like regular timestamps, and are accessible for
> > calculations. When used in calculations, time attributes are materialized
> > and act as standard timestamps. However, ordinary timestamps cannot be used
> > in place of, or be converted to, time attributes.
>
> Even worse - if you build the following statement set (pseduo):
> * CREATE left table
> * CREATE right table
> * CREATE temporary view joined <WINDOW OUTER JOIN of left,right>
> * CREATE temporary view aggregate <WINDOW AGGREGATE of joined>
> * CREATE blackhole joined_sink <schema matches joined>
> * CREATE blackhole aggregate_sink <schema matches aggregate>
> * INSERT into joined_sink
> * INSERT into aggregate_sink
>
> Then you end up with a stack trace from the planner of:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype:
> RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end,
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3)
> agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) ROWTIME
> agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference:
> agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3)
> ROWTIME {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)