Hello,
I'm working on a pyflink application (1.15 deployed to aws). I have made a
few applications and I feel like I'm making good progress, but I have a
different problem that I think requires that I have multiple stages each
with their own window.
I'm not sure how I can properly pass time into my second section. I'll
share some pseudo code that shows what i'm trying to do.
# first i have a preformat stage that does some manipulation of the
kinesis stream
formatted_source = t_env.sql_query(
"""SELECT
JSON_VALUE(`data`, '$.a') AS `a`,
JSON_VALUE(`data`, '$.b') AS `b`,
JSON_VALUE(`data`, '$.response.filtered[0]' RETURNING DOUBLE)
AS `x`,
JSON_VALUE(`data`, '$.response.filtered[1]' RETURNING DOUBLE)
AS `y`,
JSON_VALUE(`data`, '$.response.filtered[2]' RETURNING DOUBLE)
AS `z`,
JSON_VALUE(`data`, '$.response.c' RETURNING INT) AS `c`,
TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(`data`,
'$.request.tracking.time') AS BIGINT), 3) AS `time`,
CAST(JSON_VALUE(`data`, '$.response.tracking.time') AS BIGINT)
AS `v`,
`proc_time`
FROM source"""
)
# next i have a stage that combines records by (a, b and time)
first_stage = (
formatted_source.window(
Tumble.over(lit(2).seconds).on(col("proc_time")).alias(
"tumble_window")
)
.group_by(col("a"), col("b"), col("time"), col("tumble_window"))
.select(
col("a"),
col("b"),
col("time"),
first_process(
col("a"),
col("b"),
col("x"),
col("y"),
col("z"),
col("c"),
col("time"),
col("v"),
).alias("first_out"),
)
.where(col("first_out").is_not_null)
)
# I want to create a second stage here but i have no idea what to pass
into the window ".on()"
second_stage = (
first_stage.window(
Slide.over(lit(1).hour).every(lit(2).seconds).on(col("proc_time"
)).alias("slide_window")
)
.group_by(col("a"),col("b"),col("slide_window")).select(
col("a"),
col("b"),
second_process(
col("a"),
col("b"),
col("first_out"),
).alias("second_out"),
)
).execute().print()
second_stage.wait()
My first stage and sql parts are working fine. I'm not sure how to pass in
a valid time value into my second stage because I cannot add it to my first
stage group.
Any advice would be appreciated!
Thank you!
Nick Hecht