Hi, Nick.
Can you try `cascading window aggregation` here[1] if it meets your needs? 


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation




--

    Best!
    Xuyang




At 2024-03-16 06:21:52, "Nick Hecht" <nick.he...@zainartech.com> wrote:

Hello,


I'm working on a pyflink application (1.15 deployed to aws). I have made a few 
applications and I feel like I'm making good progress, but I have a different 
problem that I think requires that I have multiple stages each with their own 
window.
I'm not sure how I can properly pass time into my second section.  I'll share 
some pseudo code that shows what i'm trying to do.




    # first i have a preformat stage that does some manipulation of the kinesis 
stream
    formatted_source=t_env.sql_query(
        """SELECT
            JSON_VALUE(`data`, '$.a') AS `a`,
            JSON_VALUE(`data`, '$.b') AS `b`,
            JSON_VALUE(`data`, '$.response.filtered[0]' RETURNING DOUBLE) AS 
`x`,
            JSON_VALUE(`data`, '$.response.filtered[1]' RETURNING DOUBLE) AS 
`y`,
            JSON_VALUE(`data`, '$.response.filtered[2]' RETURNING DOUBLE) AS 
`z`,
            JSON_VALUE(`data`, '$.response.c' RETURNING INT) AS `c`,
            TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(`data`, '$.request.tracking.time') 
AS BIGINT), 3) AS `time`,
            CAST(JSON_VALUE(`data`, '$.response.tracking.time') AS BIGINT) AS 
`v`,
            `proc_time`
        FROM source"""
    )


    # next i have a stage that combines records by (a, b and time)
    first_stage= (
        formatted_source.window(
            
Tumble.over(lit(2).seconds).on(col("proc_time")).alias("tumble_window")
        )
        .group_by(col("a"), col("b"), col("time"), col("tumble_window"))
        .select(
            col("a"),
            col("b"),
            col("time"),
            first_process(
                col("a"),
                col("b"),
                col("x"),
                col("y"),
                col("z"),
                col("c"),
                col("time"),
                col("v"),
            ).alias("first_out"),
        )
        .where(col("first_out").is_not_null)
    )


    # I want to create a second stage here but i have no idea what to pass into 
the window ".on()"
    second_stage= (
        first_stage.window(
            
Slide.over(lit(1).hour).every(lit(2).seconds).on(col("proc_time")).alias("slide_window")
        )
        .group_by(col("a"),col("b"),col("slide_window")).select(
            col("a"),
            col("b"),
            second_process(
                col("a"),
                col("b"),
                col("first_out"),
            ).alias("second_out"),
        )
    ).execute().print()


    second_stage.wait()


My first stage and sql parts are working fine. I'm not sure how to pass in a 
valid time value into my second stage because I cannot add it to my first stage 
group.
Any advice would be appreciated!

Thank you!


Nick Hecht

Reply via email to