[
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eduardo Breijo updated FLINK-36626:
-----------------------------------
Description:
There is a behavior change I found when migrating to Flink 1.18+ from Flink
1.15 in regards to Flink SQL temporal lookup joins that I haven't been able to
pin point and is causing the query below to output different results.
*Flink SQL Query:*
~WITH assets_setpoint AS (~
~SELECT~
~asset_id,~
~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
~LAST_VALUE(`value`) AS `value`~
~FROM asset_readings~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
~)~
~SELECT~
~assets_supply_air_temp.`timestamp`,~
~assets_supply_air_temp.asset_id,~
~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
~FROM (~
~SELECT asset_readings.`timestamp`,~
~asset_readings.asset_id,~
~asset_readings.`value` AS `value`~
~FROM asset_readings~
~-- Metrics temporal lookup inner join~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~-- Assets to ignore for this computed metric definition temporal lookup
left join~
~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME
AS OF `proctime`~
~ON
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id =
:computedMetricDefinitionId~
~AND asset_to_ignore_per_computed_metric_definition.asset_id =
asset_readings.asset_id~
~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
~-- Filter assets not present in the asset to ignore for this computed
metric definition table~
~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
~) AS assets_supply_air_temp~
~INNER JOIN assets_setpoint~
~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
~WHERE assets_supply_air_temp.`timestamp` BETWEEN
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
*Schema:*
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-----------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~| name | type | null | key | extras |
watermark |~
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-----------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | |
SOURCE_WATERMARK() |~
~| asset_id | BIGINT | TRUE | | |
|~
~| metric_id | INT | TRUE | | |
|~
~| value | DOUBLE | TRUE | | |
|~
~| metadata | MAP<STRING, STRING> | TRUE | | |
|~
~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() |
|~
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-----------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~6 rows in set~
~+------------------------------------------------+~
~| table name |~
~+------------------------------------------------+~
~| asset_readings |~
~| asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~| metric |~
~+------------------------------------------------+~
Results:
* On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` -
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and
assets_setpoint is computed correctly for every value of the
assets_supply_air_temp (note that this subquery does not perform any
window-based grouping, so it is just raw data)
* On Flink 1.18+, for the same query, this difference always results in 0
* On Flink 1.18+, updating the query to use regular join against the metric
lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query
to output the correct value but I don't think regular joins is what I need in
this case as the metric table can change over time.
I have tried updating the query using different formats with temporal joins but
I have not found a workaround and I don't know why this is happening. Attached
you will find a file with the different SQL formats I have tried with no luck.
Any help would be appreciated
was:
There is a behavior change I found when migrating to Flink 1.18+ from Flink
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin
point and is causing the query below to output different results.
*Flink SQL Query:*
~WITH assets_setpoint AS (~
~SELECT~
~asset_id,~
~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
~LAST_VALUE(`value`) AS `value`~
~FROM asset_readings~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
~)~
~SELECT~
~assets_supply_air_temp.`timestamp`,~
~assets_supply_air_temp.asset_id,~
~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
~FROM (~
~SELECT asset_readings.`timestamp`,~
~asset_readings.asset_id,~
~asset_readings.`value` AS `value`~
~FROM asset_readings~
~-- Metrics temporal lookup inner join~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~-- Assets to ignore for this computed metric definition temporal lookup
left join~
~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME
AS OF `proctime`~
~ON
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id =
:computedMetricDefinitionId~
~AND asset_to_ignore_per_computed_metric_definition.asset_id =
asset_readings.asset_id~
~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
~-- Filter assets not present in the asset to ignore for this computed
metric definition table~
~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
~) AS assets_supply_air_temp~
~INNER JOIN assets_setpoint~
~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
~WHERE assets_supply_air_temp.`timestamp` BETWEEN
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
*Schema:*
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~| name | type | null | key | extras |
watermark |~
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | |
SOURCE_WATERMARK() |~
~| asset_id | BIGINT | TRUE | | |
|~
~| metric_id | INT | TRUE | | |
|~
~| value | DOUBLE | TRUE | | |
|~
~| metadata | MAP<STRING, STRING> | TRUE | | |
|~
~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() |
|~
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~6 rows in set~
~+------------------------------------------------+~
~| table name |~
~+------------------------------------------------+~
~| asset_readings |~
~| asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~| metric |~
~+------------------------------------------------+~
Results:
* On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` -
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and
assets_setpoint is computed correctly for every value of the
assets_supply_air_temp (note that this subquery does not perform any
window-based grouping, so it is just raw data)
* On Flink 1.18+, for the same query, this difference always results in 0
* On Flink 1.18+, updating the query to use regular join against the metric
lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query
to output the correct value but I don't think regular joins is what I need in
this case as the metric table can change over time.
I have tried updating the query using different formats with temporal joins but
I have not found a workaround and I don't know why this is happening. Attached
you will find a file with the different SQL formats I have tried with no luck.
Any help would be appreciated
> Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
> ------------------------------------------------------------------------------
>
> Key: FLINK-36626
> URL: https://issues.apache.org/jira/browse/FLINK-36626
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.18.1, 1.20.0
> Environment: AWS Managed Apache Flink
> Reporter: Eduardo Breijo
> Priority: Critical
> Attachments: Flink-SQL-query.txt
>
>
> There is a behavior change I found when migrating to Flink 1.18+ from Flink
> 1.15 in regards to Flink SQL temporal lookup joins that I haven't been able
> to pin point and is causing the query below to output different results.
> *Flink SQL Query:*
> ~WITH assets_setpoint AS (~
> ~SELECT~
> ~asset_id,~
> ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
> ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
> ~LAST_VALUE(`value`) AS `value`~
> ~FROM asset_readings~
> ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
> ~ON metric.metric_id = asset_readings.metric_id~
> ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
> ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
> ~)~
> ~SELECT~
> ~assets_supply_air_temp.`timestamp`,~
> ~assets_supply_air_temp.asset_id,~
> ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
> ~FROM (~
> ~SELECT asset_readings.`timestamp`,~
> ~asset_readings.asset_id,~
> ~asset_readings.`value` AS `value`~
> ~FROM asset_readings~
> ~-- Metrics temporal lookup inner join~
> ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
> ~ON metric.metric_id = asset_readings.metric_id~
> ~-- Assets to ignore for this computed metric definition temporal lookup
> left join~
> ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME
> AS OF `proctime`~
> ~ON
> asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id
> = :computedMetricDefinitionId~
> ~AND asset_to_ignore_per_computed_metric_definition.asset_id =
> asset_readings.asset_id~
> ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
> ~-- Filter assets not present in the asset to ignore for this computed
> metric definition table~
> ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
> ~) AS assets_supply_air_temp~
> ~INNER JOIN assets_setpoint~
> ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
> ~WHERE assets_supply_air_temp.`timestamp` BETWEEN
> assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
> *Schema:*
> ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-----------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
> ~| name | type | null | key | extras |
> watermark |~
> ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-----------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
> ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | |
> SOURCE_WATERMARK() |~
> ~| asset_id | BIGINT | TRUE | | |
> |~
> ~| metric_id | INT | TRUE | | |
> |~
> ~| value | DOUBLE | TRUE | | |
> |~
> ~| metadata | MAP<STRING, STRING> | TRUE | | |
> |~
> ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() |
> |~
> ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-----------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
> ~6 rows in set~
> ~+------------------------------------------------+~
> ~| table name |~
> ~+------------------------------------------------+~
> ~| asset_readings |~
> ~| asset_relationship_parent_to_unit |~
> ~| asset_to_ignore_per_computed_metric_definition |~
> ~| metric |~
> ~+------------------------------------------------+~
> Results:
> * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` -
> assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and
> assets_setpoint is computed correctly for every value of the
> assets_supply_air_temp (note that this subquery does not perform any
> window-based grouping, so it is just raw data)
> * On Flink 1.18+, for the same query, this difference always results in 0
> * On Flink 1.18+, updating the query to use regular join against the metric
> lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the
> query to output the correct value but I don't think regular joins is what I
> need in this case as the metric table can change over time.
>
> I have tried updating the query using different formats with temporal joins
> but I have not found a workaround and I don't know why this is happening.
> Attached you will find a file with the different SQL formats I have tried
> with no luck.
> Any help would be appreciated
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)