Is it possible to add an extra field in the ORDER BY, or for multiple records with the same value in the ORDER BY to return their own individual value? I have checked the LAG code, and the problem is that when multiple records have the exact same value in the ORDER BY, they are processed as a single block, resulting in a "curious" behavior.
El jue, 20 feb 2025 a las 15:56, Guillermo Ortiz Fernández (< guillermo.ortiz.f...@gmail.com>) escribió: > And another option I tried it's: > > WITH ranked AS ( > select * > FROM ( > SELECT > msisdn, > eventTimestamp, > zoneIds, > ts, > ROW_NUMBER() OVER (PARTITION BY msisdn,ts ORDER BY ts) AS rownum > FROM example) > WHERE rownum = 1 > ) > SELECT > msisdn, > eventTimestamp, > ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY > ts), ARRAY[-1]), -1) AS prev_zoneIds, > ts > FROM ranked; > > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate > doesn't support consuming update and delete changes which is produced by > node Deduplicate(keep=[FirstRow], key=[msisdn, ts], order=[ROWTIME]) > > El jue, 20 feb 2025 a las 12:00, Guillermo Ortiz Fernández (< > guillermo.ortiz.f...@gmail.com>) escribió: > >> Another option would be add an extra field like: >> >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER >> BY *ts, row_number*), ARRAY[-1]), -1) AS prev_zoneIds, >> >> But it isn't possible either. >> >> El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (< >> guillermo.ortiz.f...@gmail.com>) escribió: >> >>> I tried to create a additional field based on ROW_NUMBER but it can't be >>> used in LAG function. >>> >>> The idea was: >>> WITH ranked AS ( >>> SELECT >>> msisdn, >>> eventTimestamp, >>> zoneIds, >>> ts, >>> TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY >>> ts), 3) AS ts_ranked >>> FROM example_2 >>> ) >>> SELECT >>> msisdn, >>> eventTimestamp, >>> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER >>> BY ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds, >>> ts >>> FROM ranked; >>> >>> RANKED subselect: >>> msisdn eventTimestamp zoneIds >>> ts row_num ts_ranked >>> 673944959 1739996380000 [1] >>> 2025-02-19 21:19:40.000 1 1970-01-01 01:00:00.001 >>> 673944959 1739996380000 [1] >>> 2025-02-19 21:19:40.000 2 1970-01-01 01:00:00.002 >>> >>> *But [ERROR] Could not execute SQL statement. Reason:* >>> *org.apache.flink.table.api.TableException: OVER windows' ordering in >>> stream mode must be defined on a time attribute. * >>> >>> I guess just could use "ts" field because if it's the temporal field? >>> >>> >>> El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (< >>> alessio.berne...@gmail.com>) escribió: >>> >>>> Hello, >>>> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. >>>> TIMESTAMP ts i had some not understandable behaviors. >>>> >>>> Changing the ts to TIMESTAMP (not LTZ) the identical query works as >>>> expected. >>>> So, first all, we changed all the parsing to TIMESTAMP to exclude this >>>> kind of problems. >>>> >>>> If someone is interested i can provide the exact query to reproduce the >>>> problem we figured out. >>>> >>>> Greetings, >>>> Alessio >>>> >>>> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < >>>> guillermo.ortiz.f...@gmail.com> wrote: >>>> >>>>> I have created a table that reads from a Kafka topic. What I want to >>>>> do is order the data by eventTime and add a new field that represents >>>>> the previous value using the LAG function. >>>>> >>>>> The problem arises when two records have exactly the same eventTime, >>>>> which produces a "strange" behavior. >>>>> >>>>> >>>>> CREATE TABLE example ( >>>>> eventTimestamp BIGINT NOT NULL, >>>>> msisdn INT NOT NULL, >>>>> zoneIds ARRAY<INT NOT NULL> NOT NULL, >>>>> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), >>>>> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, >>>>> WATERMARK FOR ts AS ts >>>>> ) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'example-offset', >>>>> 'properties.bootstrap.servers' = 'xxxx', >>>>> 'properties.auto.offset.reset' = 'latest', >>>>> 'scan.startup.mode' = 'latest-offset', >>>>> 'key.format' = 'raw', >>>>> 'key.fields' = 'msisdn', >>>>> 'value.format' = 'avro', >>>>> 'value.fields-include' = 'ALL', >>>>> 'scan.watermark.idle-timeout' = '1000', >>>>> >>>>> ); >>>>> >>>>> >>>>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES >>>>> (1739996380000, 673944959, ARRAY[1]), >>>>> (1739996380000, 673944959, ARRAY[1]); >>>>> >>>>> >>>>> SELECT >>>>> msisdn, >>>>> eventTimestamp, >>>>> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >>>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER >>>>> BY ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >>>>> tsFROM example; >>>>> >>>>> *Actual Result:* >>>>> msisdneventTimestampzoneIdsprev_zoneIdsts >>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 >>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected >>>>> Result:* >>>>> msisdneventTimestampzoneIdsprev_zoneIdsts >>>>> 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000 >>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 >>>>> ------------------------------ >>>>> *Is this behavior normal?* >>>>> >>>>> I am trying to achieve the expected behavior by including the metadata >>>>> of the offset in the example table and adding it to the OVER clause >>>>> in the LAG function. However, it seems that Flink does not allow >>>>> ordering by more than one column: >>>>> >>>>> >>>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY >>>>> ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >>>>> >>>>> Results in: >>>>> >>>>> [ERROR] Could not execute SQL statement. Reason: >>>>> org.apache.flink.table.api.TableException: The window can only be ordered >>>>> by a single time column. >>>>> >>>>> ------------------------------ >>>>> >>>>> Would you happen to know how to achieve the expected result? >>>>> >>>>