Thank you! looking though that helped me figure out what i needed to do!

at first i was nervous because the docs you sent me are using the SQL
approach and i wanted to use the python code i had,  but the i noticed the
the example SQL  it was passing rowtime from one window to another.

I determined that I could use   col("tumble_window").rowtime   attribute on
the window column and alias it so I could use it in the next window.
and lastly i originally was using proctime for my first window and i needed
to  convert over to using a timestamp and watermark so i could call
".rowtime"  i was getting an error when trying to call .rowtime on a
proctime window.

once i switched those 2 things i was able to cascade the window using the
python code (i didn't need to use SQL syntax for this)


Thank you
Nick




On Sun, Mar 17, 2024 at 9:32 PM Xuyang <xyzhong...@163.com> wrote:

> Hi, Nick.
> Can you try `cascading window aggregation` here[1] if it meets your needs?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation
>
>
> --
>     Best!
>     Xuyang
>
>
> At 2024-03-16 06:21:52, "Nick Hecht" <nick.he...@zainartech.com> wrote:
>
> Hello,
>
> I'm working on a pyflink application (1.15 deployed to aws). I have made a
> few applications and I feel like I'm making good progress, but I have a
> different problem that I think requires that I have multiple stages each
> with their own window.
> I'm not sure how I can properly pass time into my second section.  I'll
> share some pseudo code that shows what i'm trying to do.
>
>
>     # first i have a preformat stage that does some manipulation of the
> kinesis stream
>     formatted_source = t_env.sql_query(
>         """SELECT
>             JSON_VALUE(`data`, '$.a') AS `a`,
>             JSON_VALUE(`data`, '$.b') AS `b`,
>             JSON_VALUE(`data`, '$.response.filtered[0]' RETURNING DOUBLE)
> AS `x`,
>             JSON_VALUE(`data`, '$.response.filtered[1]' RETURNING DOUBLE)
> AS `y`,
>             JSON_VALUE(`data`, '$.response.filtered[2]' RETURNING DOUBLE)
> AS `z`,
>             JSON_VALUE(`data`, '$.response.c' RETURNING INT) AS `c`,
>             TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(`data`,
> '$.request.tracking.time') AS BIGINT), 3) AS `time`,
>             CAST(JSON_VALUE(`data`, '$.response.tracking.time') AS BIGINT)
> AS `v`,
>             `proc_time`
>         FROM source"""
>     )
>
>     # next i have a stage that combines records by (a, b and time)
>     first_stage = (
>         formatted_source.window(
>             Tumble.over(lit(2).seconds).on(col("proc_time")).alias(
> "tumble_window")
>         )
>         .group_by(col("a"), col("b"), col("time"), col("tumble_window"))
>         .select(
>             col("a"),
>             col("b"),
>             col("time"),
>             first_process(
>                 col("a"),
>                 col("b"),
>                 col("x"),
>                 col("y"),
>                 col("z"),
>                 col("c"),
>                 col("time"),
>                 col("v"),
>             ).alias("first_out"),
>         )
>         .where(col("first_out").is_not_null)
>     )
>
>     # I want to create a second stage here but i have no idea what to
> pass into the window ".on()"
>     second_stage = (
>         first_stage.window(
>             Slide.over(lit(1).hour).every(lit(2).seconds).on(col(
> "proc_time")).alias("slide_window")
>         )
>         .group_by(col("a"),col("b"),col("slide_window")).select(
>             col("a"),
>             col("b"),
>             second_process(
>                 col("a"),
>                 col("b"),
>                 col("first_out"),
>             ).alias("second_out"),
>         )
>     ).execute().print()
>
>     second_stage.wait()
>
> My first stage and sql parts are working fine. I'm not sure how to pass in
> a valid time value into my second stage because I cannot add it to my first
> stage group.
> Any advice would be appreciated!
>
> Thank you!
>
> Nick Hecht
>
>

Reply via email to