[
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eduardo Breijo updated FLINK-36626:
-----------------------------------
Description:
There is a behavior change I found when migrating to Flink 1.18 or Flink 1.20
from Flink 1.15 in regards to Flink SQL temporal lookup joins that I haven't
been able to pin point and is causing the query below to output different
results.
*Flink SQL Query:*
~WITH assets_setpoint AS (~
~SELECT~
~asset_id,~
~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
~LAST_VALUE(`value`) AS `value`~
~FROM asset_readings~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
~),~
~assets_supply_air_temp AS (~
~-- CAST needed to perform regular joins instead of temporal joins in the
outer query~
~SELECT CAST(asset_readings.`timestamp` AS TIMESTAMP) AS `timestamp`,~
~asset_readings.asset_id,~
~asset_readings.`value` AS `value`~
~FROM asset_readings~
~-- Metrics temporal lookup inner join~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~-- Assets to ignore for this computed metric definition temporal lookup left
join~
~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS
OF `proctime`~
~ON
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id =
:computedMetricDefinitionId~
~AND asset_to_ignore_per_computed_metric_definition.asset_id =
asset_readings.asset_id~
~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
~-- Filter assets not present in the asset to ignore for this computed metric
definition table~
~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
~)~
~SELECT~
~assets_supply_air_temp.`timestamp`,~
~assets_supply_air_temp.asset_id,~
~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
~FROM assets_supply_air_temp~
~INNER JOIN assets_setpoint~
~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
~AND assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp
AND assets_setpoint.end_timestamp~
*Schema:*
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{{-}}{+}------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---------\{+}~
~| name | type | null | key | extras |
watermark |~
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{{-}}{+}------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---------\{+}~
~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | |
SOURCE_WATERMARK() |~
~| asset_id | BIGINT | TRUE | | |
|~
~| metric_id | INT | TRUE | | |
|~
~| value | DOUBLE | TRUE | | |
|~
~| metadata | MAP<STRING, STRING> | TRUE | | |
|~
~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() |
|~
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{{-}}{+}------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---------\{+}~
~6 rows in set~
~+------------------------------------------------+~
~| table name |~
~+------------------------------------------------+~
~| asset_readings |~
~| asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~| metric |~
~+------------------------------------------------+~
Results:
* On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` -
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and
assets_setpoint is computed correctly for every value of the
assets_supply_air_temp (note that this subquery does not perform any
window-based grouping, so it is just raw data)
* On Flink 1.18+, for the same query, this difference always results in 0
* On Flink 1.18+, updating the query to use regular join against the metric
lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query
to output the correct value, however this causes a performance hit as the
assets_readings table is built from a Kinesis data stream and the metric table
can change over time.
* Please see the attached "Task Execution Plan.txt" file to see the difference
in temporal joins between Flink 1.15 and Flink 1.20
I have tried updating the query using different formats with temporal joins but
I have not found a workaround and I don't know why this is happening. Attached
you will find a file with the different SQL formats I have tried with no luck.
Any help would be appreciated
was:
There is a behavior change I found when migrating to Flink 1.18+ from Flink
1.15 in regards to Flink SQL temporal lookup joins that I haven't been able to
pin point and is causing the query below to output different results.
*Flink SQL Query:*
~WITH assets_setpoint AS (~
~SELECT~
~asset_id,~
~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
~LAST_VALUE(`value`) AS `value`~
~FROM asset_readings~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
~),~
~assets_supply_air_temp AS (~
~-- CAST needed to perform regular joins instead of temporal joins in the
outer query~
~SELECT CAST(asset_readings.`timestamp` AS TIMESTAMP) AS `timestamp`,~
~asset_readings.asset_id,~
~asset_readings.`value` AS `value`~
~FROM asset_readings~
~-- Metrics temporal lookup inner join~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~-- Assets to ignore for this computed metric definition temporal lookup left
join~
~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS
OF `proctime`~
~ON
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id =
:computedMetricDefinitionId~
~AND asset_to_ignore_per_computed_metric_definition.asset_id =
asset_readings.asset_id~
~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
~-- Filter assets not present in the asset to ignore for this computed metric
definition table~
~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
~)~
~SELECT~
~assets_supply_air_temp.`timestamp`,~
~assets_supply_air_temp.asset_id,~
~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
~FROM assets_supply_air_temp~
~INNER JOIN assets_setpoint~
~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
~AND assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp
AND assets_setpoint.end_timestamp~
*Schema:*
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~
~| name | type | null | key | extras |
watermark |~
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~
~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | |
SOURCE_WATERMARK() |~
~| asset_id | BIGINT | TRUE | | |
|~
~| metric_id | INT | TRUE | | |
|~
~| value | DOUBLE | TRUE | | |
|~
~| metadata | MAP<STRING, STRING> | TRUE | | |
|~
~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() |
|~
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~
~6 rows in set~
~+------------------------------------------------+~
~| table name |~
~+------------------------------------------------+~
~| asset_readings |~
~| asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~| metric |~
~+------------------------------------------------+~
Results:
* On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` -
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and
assets_setpoint is computed correctly for every value of the
assets_supply_air_temp (note that this subquery does not perform any
window-based grouping, so it is just raw data)
* On Flink 1.18+, for the same query, this difference always results in 0
* On Flink 1.18+, updating the query to use regular join against the metric
lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query
to output the correct value, however this causes a performance hit as the
assets_readings table is built from a Kinesis data stream and the metric table
can change over time.
* Please see the attached "Task Execution Plan.txt" file to see the difference
in temporal joins between Flink 1.15 and Flink 1.20
I have tried updating the query using different formats with temporal joins but
I have not found a workaround and I don't know why this is happening. Attached
you will find a file with the different SQL formats I have tried with no luck.
Any help would be appreciated
> Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.20
> -----------------------------------------------------------------------------
>
> Key: FLINK-36626
> URL: https://issues.apache.org/jira/browse/FLINK-36626
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.18.1, 1.20.0
> Environment: AWS Managed Apache Flink
> Reporter: Eduardo Breijo
> Priority: Critical
> Attachments: Tasks Execution Plan.txt
>
>
> There is a behavior change I found when migrating to Flink 1.18 or Flink 1.20
> from Flink 1.15 in regards to Flink SQL temporal lookup joins that I haven't
> been able to pin point and is causing the query below to output different
> results.
> *Flink SQL Query:*
> ~WITH assets_setpoint AS (~
> ~SELECT~
> ~asset_id,~
> ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
> ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
> ~LAST_VALUE(`value`) AS `value`~
> ~FROM asset_readings~
> ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
> ~ON metric.metric_id = asset_readings.metric_id~
> ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
> ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
> ~),~
> ~assets_supply_air_temp AS (~
> ~-- CAST needed to perform regular joins instead of temporal joins in the
> outer query~
> ~SELECT CAST(asset_readings.`timestamp` AS TIMESTAMP) AS `timestamp`,~
> ~asset_readings.asset_id,~
> ~asset_readings.`value` AS `value`~
> ~FROM asset_readings~
> ~-- Metrics temporal lookup inner join~
> ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
> ~ON metric.metric_id = asset_readings.metric_id~
> ~-- Assets to ignore for this computed metric definition temporal lookup
> left join~
> ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME
> AS OF `proctime`~
> ~ON
> asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id
> = :computedMetricDefinitionId~
> ~AND asset_to_ignore_per_computed_metric_definition.asset_id =
> asset_readings.asset_id~
> ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
> ~-- Filter assets not present in the asset to ignore for this computed
> metric definition table~
> ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
> ~)~
> ~SELECT~
> ~assets_supply_air_temp.`timestamp`,~
> ~assets_supply_air_temp.asset_id,~
> ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
> ~FROM assets_supply_air_temp~
> ~INNER JOIN assets_setpoint~
> ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
> ~AND assets_supply_air_temp.`timestamp` BETWEEN
> assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
>
> *Schema:*
> ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{{-}}{+}------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---------\{+}~
> ~| name | type | null | key | extras |
> watermark |~
> ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{{-}}{+}------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---------\{+}~
> ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | |
> SOURCE_WATERMARK() |~
> ~| asset_id | BIGINT | TRUE | | |
> |~
> ~| metric_id | INT | TRUE | | |
> |~
> ~| value | DOUBLE | TRUE | | |
> |~
> ~| metadata | MAP<STRING, STRING> | TRUE | | |
> |~
> ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() |
> |~
> ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{{-}}{+}------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---------\{+}~
> ~6 rows in set~
> ~+------------------------------------------------+~
> ~| table name |~
> ~+------------------------------------------------+~
> ~| asset_readings |~
> ~| asset_relationship_parent_to_unit |~
> ~| asset_to_ignore_per_computed_metric_definition |~
> ~| metric |~
> ~+------------------------------------------------+~
> Results:
> * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` -
> assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and
> assets_setpoint is computed correctly for every value of the
> assets_supply_air_temp (note that this subquery does not perform any
> window-based grouping, so it is just raw data)
> * On Flink 1.18+, for the same query, this difference always results in 0
> * On Flink 1.18+, updating the query to use regular join against the metric
> lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the
> query to output the correct value, however this causes a performance hit as
> the assets_readings table is built from a Kinesis data stream and the metric
> table can change over time.
> * Please see the attached "Task Execution Plan.txt" file to see the
> difference in temporal joins between Flink 1.15 and Flink 1.20
>
> I have tried updating the query using different formats with temporal joins
> but I have not found a workaround and I don't know why this is happening.
> Attached you will find a file with the different SQL formats I have tried
> with no luck.
> Any help would be appreciated
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)