Re: Beam SQL found limitations

2023-05-31 Thread Kenneth Knowles
1. Yes, using state is a better fit than Beam windowing. You will want to
use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
This will make it so you can be sure you are actually getting the
"previous" event. They can arrive in any order without this annotation. You
won't be able to do this in SQL. I don't think Beam SQL has implementations
of analytic functions that have this ability.

Kenn

On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi Kenn,
>
> Thanks for clarification.
>
> 1. Just to put an example in front - for every event that comes in I need
> to find corresponding previous event of same user_id and pass
> previous_event_timestamp in the current event payload down (and also
> current event becomes previous event for future events that come in for
> same user). Question is how to do it with BeamSQL. I am aware that analytic
> windowing (like last_value over etc.) might not be a way for streaming and
> I am ok with this - it make sense under the hood just as You mention.
>
> The task is to be able to keep a simple state in streaming SQL. What I did
> come up with is using sliding window to have this state available for each
> new event that comes in.
>
> ```
>
> WITH
> unbounded_stream_initialized AS (
> SELECT
> user_id,
> event_time
> FROM unbounded_stream
> GROUP BY
> user_id,
> event_time,
> TUMBLE(event_time,INTERVAL '1' SECONDS)
> UNION ALL
> -- this is needed as first session window by default starts at first
> element, while here we need to start it in the past
> -- so that there is a window that ends just after first real element
> SELECT
> CAST(0 AS BIGINT) AS user_id,
> CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
> FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not allow
> to have GROUP BY just after SELECT
> GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL '1'
> SECONDS)
> ),
> test_data_1 AS (
> SELECT
> user_id,
> MAX(event_time) AS prev_event_time,
> HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
> window_end_at
> FROM unbounded_stream_initialized
> GROUP BY
> user_id,
> HOP(
> -- first create a sliding window to aggregate state
> event_time,
> INTERVAL '1' SECONDS,
> INTERVAL '7' DAYS -- The idea is to have this quite long
> compared to interval
> )
> ),
> test_data_1_lookup AS (
> SELECT
> user_id,
> prev_event_time
> FROM test_data_1
> GROUP BY
> user_id,
> -- then re-window into windows suitable for joining main stream
> TUMBLE(window_end_at, INTERVAL '1' SECONDS)
> ),
> enriched_info AS (
> SELECT
> unbounded_stream_initialized.event_timestamp AS event_timestamp,
> unbounded_stream_initialized.user_id AS user_id,
> test_data_1_lookup.prev_event_time AS prev_event_time
> FROM unbounded_stream_initialized
> LEFT JOIN test_data_1_lookup
> ON unbounded_stream_initialized.user_id =
> test_data_1_lookup.user_id
> )
> SELECT
> *
> FROM enriched_info
>
> ```
>
> The doubt that I have is whether above will not store too much redundant
> data as `test_data_1` suggests it could duplicate and store each incoming
> msg into all windows there are in the sliding window definition (might be a
> lot in this case). Or actually resolving if a message belongs to a window
> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still
> learning Beam so there might be some core thing that I miss to understand
> how it is processed.
>
> 2. Any hints on implementing FirestoreIOTableProvider? just more or less
> how to do it where to look for important parts etc. It seems we would need
> this functionality.
>
> 3. I will try to report some more interesting findings. If possible please
> prioritize fixing this ROW error.
>
> Best
>
> Piotr
>
> On 26.05.2023 21:36, Kenneth Knowles wrote:
>
> Just want to clarify that Beam's concept of windowing is really an
> event-time based key, and they are all processed logically simultaneously.
> SQL's concept of windowing function is to sort rows and process them
> linearly. They are actually totally different. From your queries it seems
> you are interested in SQL's windowing functions (aka analytic functions).
>
> I am surprised by the problems with rows, since we have used them
> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
> problem.
>
> And for the CROSS JOIN it would be a nice feature to allow in some cases
> it seems. Should not be hard.
>
> Thank you for reporting this! If you have time it would be really great to
> get each of these reproducible problems into GitHub issues, each.
>
> Kenn
>
> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
> contact.wisniowskipi...@gmail.com> wrote:
>
>> 

Re: Beam SQL found limitations

2023-05-31 Thread Wiśniowski Piotr

Hi Kenn,

Thanks for clarification.

1. Just to put an example in front - for every event that comes in I 
need to find corresponding previous event of same user_id and pass 
previous_event_timestamp in the current event payload down (and also 
current event becomes previous event for future events that come in for 
same user). Question is how to do it with BeamSQL. I am aware that 
analytic windowing (like last_value over etc.) might not be a way for 
streaming and I am ok with this - it make sense under the hood just as 
You mention.


The task is to be able to keep a simple state in streaming SQL. What I 
did come up with is using sliding window to have this state available 
for each new event that comes in.


```

WITH
unbounded_stream_initialized AS (
    SELECT
    user_id,
    event_time
    FROM unbounded_stream
    GROUP BY
    user_id,
    event_time,
    TUMBLE(event_time,INTERVAL '1' SECONDS)
    UNION ALL
    -- this is needed as first session window by default starts at 
first element, while here we need to start it in the past

    -- so that there is a window that ends just after first real element
    SELECT
    CAST(0 AS BIGINT) AS user_id,
    CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
    FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not 
allow to have GROUP BY just after SELECT
    GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL 
'1' SECONDS)

),
test_data_1 AS (
    SELECT
    user_id,
    MAX(event_time) AS prev_event_time,
    HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS 
window_end_at

    FROM unbounded_stream_initialized
    GROUP BY
    user_id,
    HOP(
    -- first create a sliding window to aggregate state
    event_time,
    INTERVAL '1' SECONDS,
    INTERVAL '7' DAYS -- The idea is to have this quite long 
compared to interval

    )
),
test_data_1_lookup AS (
    SELECT
    user_id,
    prev_event_time
    FROM test_data_1
    GROUP BY
    user_id,
    -- then re-window into windows suitable for joining main stream
    TUMBLE(window_end_at, INTERVAL '1' SECONDS)
),
enriched_info AS (
    SELECT
    unbounded_stream_initialized.event_timestamp AS event_timestamp,
    unbounded_stream_initialized.user_id AS user_id,
    test_data_1_lookup.prev_event_time AS prev_event_time
    FROM unbounded_stream_initialized
    LEFT JOIN test_data_1_lookup
    ON unbounded_stream_initialized.user_id = 
test_data_1_lookup.user_id

)
SELECT
    *
FROM enriched_info

```

The doubt that I have is whether above will not store too much redundant 
data as `test_data_1` suggests it could duplicate and store each 
incoming msg into all windows there are in the sliding window definition 
(might be a lot in this case). Or actually resolving if a message 
belongs to a window is done later when evaluating `LEFT JOIN`? Target 
DataFlow. I am still learning Beam so there might be some core thing 
that I miss to understand how it is processed.


2. Any hints on implementing FirestoreIOTableProvider? just more or less 
how to do it where to look for important parts etc. It seems we would 
need this functionality.


3. I will try to report some more interesting findings. If possible 
please prioritize fixing this ROW error.


Best

Piotr

On 26.05.2023 21:36, Kenneth Knowles wrote:
Just want to clarify that Beam's concept of windowing is really an 
event-time based key, and they are all processed logically 
simultaneously. SQL's concept of windowing function is to sort rows 
and process them linearly. They are actually totally different. From 
your queries it seems you are interested in SQL's windowing functions 
(aka analytic functions).


I am surprised by the problems with rows, since we have used them 
extensively. Hopefully it is not too hard to fix. Same with the UNION 
ALL problem.


And for the CROSS JOIN it would be a nice feature to allow in some 
cases it seems. Should not be hard.


Thank you for reporting this! If you have time it would be really 
great to get each of these reproducible problems into GitHub issues, each.


Kenn

On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr 
 wrote:


Hi Alexey,

Thank You for reference to that discussion I do actually have
pretty similar thoughts on what Beam SQL needs.

Update from my side:

Actually did find a workaround for issue with windowing function
on stream. It basically boils down to using sliding window to
collect and aggregate the state. But would need an advice if this
is actually a cost efficient method (targeting DataFlow runner).
The doubt that I have is that this sliding window would need to
have sliding interval less than 1s and size more than a week and
be feed with quire frequent data. If I do understand this
correctly - it would mean each input row would need to be
duplicated for each window and