Thank you! looking though that helped me figure out what i needed to do! at first i was nervous because the docs you sent me are using the SQL approach and i wanted to use the python code i had, but the i noticed the the example SQL it was passing rowtime from one window to another.
I determined that I could use col("tumble_window").rowtime attribute on the window column and alias it so I could use it in the next window. and lastly i originally was using proctime for my first window and i needed to convert over to using a timestamp and watermark so i could call ".rowtime" i was getting an error when trying to call .rowtime on a proctime window. once i switched those 2 things i was able to cascade the window using the python code (i didn't need to use SQL syntax for this) Thank you Nick On Sun, Mar 17, 2024 at 9:32 PM Xuyang <xyzhong...@163.com> wrote: > Hi, Nick. > Can you try `cascading window aggregation` here[1] if it meets your needs? > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation > > > -- > Best! > Xuyang > > > At 2024-03-16 06:21:52, "Nick Hecht" <nick.he...@zainartech.com> wrote: > > Hello, > > I'm working on a pyflink application (1.15 deployed to aws). I have made a > few applications and I feel like I'm making good progress, but I have a > different problem that I think requires that I have multiple stages each > with their own window. > I'm not sure how I can properly pass time into my second section. I'll > share some pseudo code that shows what i'm trying to do. > > > # first i have a preformat stage that does some manipulation of the > kinesis stream > formatted_source = t_env.sql_query( > """SELECT > JSON_VALUE(`data`, '$.a') AS `a`, > JSON_VALUE(`data`, '$.b') AS `b`, > JSON_VALUE(`data`, '$.response.filtered[0]' RETURNING DOUBLE) > AS `x`, > JSON_VALUE(`data`, '$.response.filtered[1]' RETURNING DOUBLE) > AS `y`, > JSON_VALUE(`data`, '$.response.filtered[2]' RETURNING DOUBLE) > AS `z`, > JSON_VALUE(`data`, '$.response.c' RETURNING INT) AS `c`, > TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(`data`, > '$.request.tracking.time') AS BIGINT), 3) AS `time`, > CAST(JSON_VALUE(`data`, '$.response.tracking.time') AS BIGINT) > AS `v`, > `proc_time` > FROM source""" > ) > > # next i have a stage that combines records by (a, b and time) > first_stage = ( > formatted_source.window( > Tumble.over(lit(2).seconds).on(col("proc_time")).alias( > "tumble_window") > ) > .group_by(col("a"), col("b"), col("time"), col("tumble_window")) > .select( > col("a"), > col("b"), > col("time"), > first_process( > col("a"), > col("b"), > col("x"), > col("y"), > col("z"), > col("c"), > col("time"), > col("v"), > ).alias("first_out"), > ) > .where(col("first_out").is_not_null) > ) > > # I want to create a second stage here but i have no idea what to > pass into the window ".on()" > second_stage = ( > first_stage.window( > Slide.over(lit(1).hour).every(lit(2).seconds).on(col( > "proc_time")).alias("slide_window") > ) > .group_by(col("a"),col("b"),col("slide_window")).select( > col("a"), > col("b"), > second_process( > col("a"), > col("b"), > col("first_out"), > ).alias("second_out"), > ) > ).execute().print() > > second_stage.wait() > > My first stage and sql parts are working fine. I'm not sure how to pass in > a valid time value into my second stage because I cannot add it to my first > stage group. > Any advice would be appreciated! > > Thank you! > > Nick Hecht > >