Is it possible to add an extra field in the ORDER BY, or for multiple
records with the same value in the ORDER BY to return their own individual
value?
I have checked the LAG code, and the problem is that when multiple records
have the exact same value in the ORDER BY, they are processed as a single
block, resulting in a "curious" behavior.

El jue, 20 feb 2025 a las 15:56, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> And another option I tried it's:
>
> WITH ranked AS (
>   select *
>   FROM (
>     SELECT
>         msisdn,
>         eventTimestamp,
>         zoneIds,
>         ts,
>         ROW_NUMBER() OVER (PARTITION BY msisdn,ts ORDER BY ts) AS rownum
>     FROM example)
>   WHERE rownum = 1
> )
> SELECT
>     msisdn,
>     eventTimestamp,
>     ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
> ts), ARRAY[-1]), -1) AS prev_zoneIds,
>     ts
> FROM ranked;
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[msisdn, ts], order=[ROWTIME])
>
> El jue, 20 feb 2025 a las 12:00, Guillermo Ortiz Fernández (<
> guillermo.ortiz.f...@gmail.com>) escribió:
>
>> Another option would be add an extra field like:
>>
>>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER
>> BY *ts, row_number*), ARRAY[-1]), -1) AS prev_zoneIds,
>>
>> But it isn't possible either.
>>
>> El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (<
>> guillermo.ortiz.f...@gmail.com>) escribió:
>>
>>> I tried to create a additional field based on ROW_NUMBER but it can't be
>>> used in LAG function.
>>>
>>> The idea was:
>>> WITH ranked AS (
>>>     SELECT
>>>         msisdn,
>>>         eventTimestamp,
>>>         zoneIds,
>>>         ts,
>>>         TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY
>>> ts), 3) AS ts_ranked
>>>     FROM example_2
>>> )
>>> SELECT
>>>     msisdn,
>>>     eventTimestamp,
>>>     ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>>>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER
>>> BY ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds,
>>>     ts
>>> FROM ranked;
>>>
>>> RANKED subselect:
>>>       msisdn       eventTimestamp                        zoneIds
>>>              ts              row_num               ts_ranked
>>>    673944959        1739996380000                            [1]
>>> 2025-02-19 21:19:40.000                    1 1970-01-01 01:00:00.001
>>>    673944959        1739996380000                            [1]
>>> 2025-02-19 21:19:40.000                    2 1970-01-01 01:00:00.002
>>>
>>> *But [ERROR] Could not execute SQL statement. Reason:*
>>> *org.apache.flink.table.api.TableException: OVER windows' ordering in
>>> stream mode must be defined on a time attribute. *
>>>
>>> I guess just could use "ts" field because if it's the temporal field?
>>>
>>>
>>> El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (<
>>> alessio.berne...@gmail.com>) escribió:
>>>
>>>> Hello,
>>>> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs.
>>>> TIMESTAMP ts i had some not understandable behaviors.
>>>>
>>>> Changing the ts to TIMESTAMP (not LTZ) the identical query works as
>>>> expected.
>>>> So, first all, we changed all the parsing to TIMESTAMP to exclude this
>>>> kind of problems.
>>>>
>>>> If someone is interested i can provide the exact query to reproduce the
>>>> problem we figured out.
>>>>
>>>> Greetings,
>>>> Alessio
>>>>
>>>> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández <
>>>> guillermo.ortiz.f...@gmail.com> wrote:
>>>>
>>>>> I have created a table that reads from a Kafka topic. What I want to
>>>>> do is order the data by eventTime and add a new field that represents
>>>>> the previous value using the LAG function.
>>>>>
>>>>> The problem arises when two records have exactly the same eventTime,
>>>>> which produces a "strange" behavior.
>>>>>
>>>>>
>>>>> CREATE TABLE example (
>>>>>     eventTimestamp BIGINT NOT NULL,
>>>>>     msisdn INT NOT NULL,
>>>>>     zoneIds ARRAY<INT NOT NULL> NOT NULL,
>>>>>     ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
>>>>>     `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
>>>>>     WATERMARK FOR ts AS ts
>>>>> ) WITH (
>>>>>     'connector' = 'kafka',
>>>>>     'topic' = 'example-offset',
>>>>>     'properties.bootstrap.servers' = 'xxxx',
>>>>>     'properties.auto.offset.reset' = 'latest',
>>>>>     'scan.startup.mode' = 'latest-offset',
>>>>>     'key.format' = 'raw',
>>>>>     'key.fields' = 'msisdn',
>>>>>     'value.format' = 'avro',
>>>>>     'value.fields-include' = 'ALL',
>>>>>     'scan.watermark.idle-timeout' = '1000',
>>>>>
>>>>> );
>>>>>
>>>>>
>>>>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
>>>>> (1739996380000, 673944959, ARRAY[1]),
>>>>> (1739996380000, 673944959, ARRAY[1]);
>>>>>
>>>>>
>>>>> SELECT
>>>>>     msisdn,
>>>>>     eventTimestamp,
>>>>>     ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>>>>>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER 
>>>>> BY ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>>>>     tsFROM example;
>>>>>
>>>>> *Actual Result:*
>>>>> msisdneventTimestampzoneIdsprev_zoneIdsts
>>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
>>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected
>>>>> Result:*
>>>>> msisdneventTimestampzoneIdsprev_zoneIdsts
>>>>> 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000
>>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
>>>>> ------------------------------
>>>>> *Is this behavior normal?*
>>>>>
>>>>> I am trying to achieve the expected behavior by including the metadata
>>>>> of the offset in the example table and adding it to the OVER clause
>>>>> in the LAG function. However, it seems that Flink does not allow
>>>>> ordering by more than one column:
>>>>>
>>>>>
>>>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
>>>>> ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>>>>
>>>>> Results in:
>>>>>
>>>>> [ERROR] Could not execute SQL statement. Reason:
>>>>> org.apache.flink.table.api.TableException: The window can only be ordered 
>>>>> by a single time column.
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> Would you happen to know how to achieve the expected result?
>>>>>
>>>>

Reply via email to