1. Yes, using state is a better fit than Beam windowing. You will want to
use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
This will make it so you can be sure you are actually getting the
"previous" event. They can arrive in any order without this annotation. You
won't be able to do this in SQL. I don't think Beam SQL has implementations
of analytic functions that have this ability.
Kenn
On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:
> Hi Kenn,
>
> Thanks for clarification.
>
> 1. Just to put an example in front - for every event that comes in I need
> to find corresponding previous event of same user_id and pass
> previous_event_timestamp in the current event payload down (and also
> current event becomes previous event for future events that come in for
> same user). Question is how to do it with BeamSQL. I am aware that analytic
> windowing (like last_value over etc.) might not be a way for streaming and
> I am ok with this - it make sense under the hood just as You mention.
>
> The task is to be able to keep a simple state in streaming SQL. What I did
> come up with is using sliding window to have this state available for each
> new event that comes in.
>
> ```
>
> WITH
> unbounded_stream_initialized AS (
> SELECT
> user_id,
> event_time
> FROM unbounded_stream
> GROUP BY
> user_id,
> event_time,
> TUMBLE(event_time,INTERVAL '1' SECONDS)
> UNION ALL
> -- this is needed as first session window by default starts at first
> element, while here we need to start it in the past
> -- so that there is a window that ends just after first real element
> SELECT
> CAST(0 AS BIGINT) AS user_id,
> CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
> FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not allow
> to have GROUP BY just after SELECT
> GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL '1'
> SECONDS)
> ),
> test_data_1 AS (
> SELECT
> user_id,
> MAX(event_time) AS prev_event_time,
> HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
> window_end_at
> FROM unbounded_stream_initialized
> GROUP BY
> user_id,
> HOP(
> -- first create a sliding window to aggregate state
> event_time,
> INTERVAL '1' SECONDS,
> INTERVAL '7' DAYS -- The idea is to have this quite long
> compared to interval
> )
> ),
> test_data_1_lookup AS (
> SELECT
> user_id,
> prev_event_time
> FROM test_data_1
> GROUP BY
> user_id,
> -- then re-window into windows suitable for joining main stream
> TUMBLE(window_end_at, INTERVAL '1' SECONDS)
> ),
> enriched_info AS (
> SELECT
> unbounded_stream_initialized.event_timestamp AS event_timestamp,
> unbounded_stream_initialized.user_id AS user_id,
> test_data_1_lookup.prev_event_time AS prev_event_time
> FROM unbounded_stream_initialized
> LEFT JOIN test_data_1_lookup
> ON unbounded_stream_initialized.user_id =
> test_data_1_lookup.user_id
> )
> SELECT
> *
> FROM enriched_info
>
> ```
>
> The doubt that I have is whether above will not store too much redundant
> data as `test_data_1` suggests it could duplicate and store each incoming
> msg into all windows there are in the sliding window definition (might be a
> lot in this case). Or actually resolving if a message belongs to a window
> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still
> learning Beam so there might be some core thing that I miss to understand
> how it is processed.
>
> 2. Any hints on implementing FirestoreIOTableProvider? just more or less
> how to do it where to look for important parts etc. It seems we would need
> this functionality.
>
> 3. I will try to report some more interesting findings. If possible please
> prioritize fixing this ROW error.
>
> Best
>
> Piotr
>
> On 26.05.2023 21:36, Kenneth Knowles wrote:
>
> Just want to clarify that Beam's concept of windowing is really an
> event-time based key, and they are all processed logically simultaneously.
> SQL's concept of windowing function is to sort rows and process them
> linearly. They are actually totally different. From your queries it seems
> you are interested in SQL's windowing functions (aka analytic functions).
>
> I am surprised by the problems with rows, since we have used them
> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
> problem.
>
> And for the CROSS JOIN it would be a nice feature to allow in some cases
> it seems. Should not be hard.
>
> Thank you for reporting this! If you have time it would be really great to
> get each of these reproducible problems into GitHub issues, each.
>
> Kenn
>
> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
> contact.wisniowskipi...@gmail.com> wrote:
>
>>