Hi Kenn,

Thanks for clarification.

1. Just to put an example in front - for every event that comes in I need to find corresponding previous event of same user_id and pass previous_event_timestamp in the current event payload down (and also current event becomes previous event for future events that come in for same user). Question is how to do it with BeamSQL. I am aware that analytic windowing (like last_value over etc.) might not be a way for streaming and I am ok with this - it make sense under the hood just as You mention.

The task is to be able to keep a simple state in streaming SQL. What I did come up with is using sliding window to have this state available for each new event that comes in.

```

WITH
unbounded_stream_initialized AS (
    SELECT
        user_id,
        event_time
    FROM unbounded_stream
    GROUP BY
        user_id,
        event_time,
        TUMBLE(event_time,INTERVAL '1' SECONDS)
    UNION ALL
    -- this is needed as first session window by default starts at first element, while here we need to start it in the past
    -- so that there is a window that ends just after first real element
    SELECT
        CAST(0 AS BIGINT) AS user_id,
        CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
    FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not allow to have GROUP BY just after SELECT     GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL '1' SECONDS)
),
test_data_1 AS (
    SELECT
        user_id,
        MAX(event_time) AS prev_event_time,
        HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS window_end_at
    FROM unbounded_stream_initialized
    GROUP BY
        user_id,
        HOP(
            -- first create a sliding window to aggregate state
            event_time,
            INTERVAL '1' SECONDS,
            INTERVAL '7' DAYS -- The idea is to have this quite long compared to interval
        )
),
test_data_1_lookup AS (
    SELECT
        user_id,
        prev_event_time
    FROM test_data_1
    GROUP BY
        user_id,
        -- then re-window into windows suitable for joining main stream
        TUMBLE(window_end_at, INTERVAL '1' SECONDS)
),
enriched_info AS (
    SELECT
        unbounded_stream_initialized.event_timestamp AS event_timestamp,
        unbounded_stream_initialized.user_id AS user_id,
        test_data_1_lookup.prev_event_time AS prev_event_time
    FROM unbounded_stream_initialized
    LEFT JOIN test_data_1_lookup
        ON unbounded_stream_initialized.user_id = test_data_1_lookup.user_id
)
SELECT
    *
FROM enriched_info

```

The doubt that I have is whether above will not store too much redundant data as `test_data_1` suggests it could duplicate and store each incoming msg into all windows there are in the sliding window definition (might be a lot in this case). Or actually resolving if a message belongs to a window is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still learning Beam so there might be some core thing that I miss to understand how it is processed.

2. Any hints on implementing FirestoreIOTableProvider? just more or less how to do it where to look for important parts etc. It seems we would need this functionality.

3. I will try to report some more interesting findings. If possible please prioritize fixing this ROW error.

Best

Piotr

On 26.05.2023 21:36, Kenneth Knowles wrote:
Just want to clarify that Beam's concept of windowing is really an event-time based key, and they are all processed logically simultaneously. SQL's concept of windowing function is to sort rows and process them linearly. They are actually totally different. From your queries it seems you are interested in SQL's windowing functions (aka analytic functions).

I am surprised by the problems with rows, since we have used them extensively. Hopefully it is not too hard to fix. Same with the UNION ALL problem.

And for the CROSS JOIN it would be a nice feature to allow in some cases it seems. Should not be hard.

Thank you for reporting this! If you have time it would be really great to get each of these reproducible problems into GitHub issues, each.

Kenn

On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <contact.wisniowskipi...@gmail.com> wrote:

    Hi Alexey,

    Thank You for reference to that discussion I do actually have
    pretty similar thoughts on what Beam SQL needs.

    Update from my side:

    Actually did find a workaround for issue with windowing function
    on stream. It basically boils down to using sliding window to
    collect and aggregate the state. But would need an advice if this
    is actually a cost efficient method (targeting DataFlow runner).
    The doubt that I have is that this sliding window would need to
    have sliding interval less than 1s and size more than a week and
    be feed with quire frequent data. If I do understand this
    correctly - it would mean each input row would need to be
    duplicated for each window and stored which could be quite
    significant storage cost?

    Or actually Beam does not physically duplicate the record but just
    tracks to which windows the record currently belongs?


    And the real issue that BeamSQL needs at the moment in my opinion
    is fixing bugs.

    Some bugs that I found that prevent one from using it and would
    really appreciate fast fix:

    - UNNEST ARRAY with a nested ROW (described below, created ticket
    - https://github.com/apache/beam/issues/26911)

    - PubSub table provider actually requires all table properties to
    be there (with null in `timestampAttributeKey` it fails) - which
    essentially does not allow one to use pubsub publish timestamp as
    `timestampAttributeKey`.

    - its not possible to cast VARCHAR to BYTES. And BYTES is needed
    for DataStoreV1TableProvider to provide a key for storage. Also
    consider updating
    `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
    requires VARCHAR instead of BYTES - its even easier in implementation.

    - Any hints on how to implement `FireStoreIOTableProvider`? I am
    considering implementing it and contributing depending on my team
    decision - but would like to get like idea how hard this task is.

    Will create tickets for the rest of issues when I will have some
    spare time.

    Best regards

    Wiśniowski Piotr


    On 22.05.2023 18:28, Alexey Romanenko wrote:
    Hi Piotr,

    Thanks for details! I cross-post this to dev@ as well since, I
    guess, people there can provide more insights on this.

    A while ago, I faced the similar issues trying to run Beam SQL
    against TPC-DS benchmark.
    We had a discussion around that [1], please, take a look since it
    can be helpful.

    [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b

    —
    Alexey

    On 18 May 2023, at 11:36, Wiśniowski Piotr
    <contact.wisniowskipi...@gmail.com>
    <mailto:contact.wisniowskipi...@gmail.com> wrote:

    HI,

    After experimenting with Beam SQL I did find some limitations.
    Testing on near latest main (precisely
    `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct
    runner and openjdk version "11.0.19". Please let me know if some
    of them are known/ worked on/ have tickets or have estimated fix
    time. I believe most of them are low hanging fruits or just my
    thinking is not right for the problem. If this is the case
    please guide me to some working solution.

     From my perspective it is ok to have a fix just on master - no
    need to wait for release. Priority order:
    - 7. Windowing function on a stream - in detail - How to get
    previous message for a key? setting expiration arbitrary big is
    ok, but access to the previous record must happen fairly quickly
    not wait for the big window to finish and emit the expired keys.
    Ideally would like to do it in pure beam pipeline as saving to
    some external key/value store and then reading this here could
    potentially result in some race conditions which in I would like
    to avoid, but if its the only option - let it be.
    - 5. single UNION ALL possible
    - 4. UNNEST ARRAY with nested ROW
    - 3. Using * when there is Row type present in the schema
    - 1. `CROSS JOIN` between two unrelated tables is not supported
    - even if one is a static number table
    - 2. ROW construction not supported. It is not possible to nest data

    Below queries tat I use to testing this scenarios.

    Thank You for looking at this topics!

    Best

    Wiśniowski Piotr

    -----------------------
    -- 1. `CROSS JOIN` between two unrelated tables is not supported.
    -----------------------
    -- Only supported is `CROSS JOIN UNNEST` when exploding array
    from same table.
    -- It is not possible to number rows
    WITHdata_table AS(
    SELECT1ASa
    ),
    number_table AS(
    SELECT
    numbers_exploded ASnumber_item
    FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])
    ASnumbers_exploded
    )
    SELECT
    data_table.a,
    number_table.number_item
    FROMdata_table
    CROSS JOINnumber_table
    ;
    -- CROSS JOIN, JOIN ON FALSE is not supported!
    -----------------------
    -- 2. ROW construction not supported. It is not possible to nest
    data
    -----------------------
    SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
    SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
    SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the
    same type
    SELECTMAP['field1','b','field2','a']; -- null
    -- WORKAROUND - manually compose json string,
    -- drawback - decomposing might be not supported or would need
    to be also based on string operations
    SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
    -----------------------
    -- 3. Using * when there is Row type present in the schema
    -----------------------
    CREATEEXTERNALTABLEtest_tmp_1(
    `ref`VARCHAR,
    `author`ROW<
    `name`VARCHAR,
    `email`VARCHAR
    >
    )
    TYPEtext
    LOCATION'python/dbt/tests/using_star_limitation.jsonl'
    TBLPROPERTIES '{"format":"json",
    "deadLetterFile":"top/python/dbt/tests/dead"}';
    SELECT*FROMtest_tmp_1;
    -- java.lang.NoSuchFieldException: name
    -- WORKAROUND - refer to columns explicitly with alias
    SELECT
    `ref`ASref_value,
    test_tmp_1.`author`.`name`ASauthor_name, -- table name must be
    referenced explicitly - this could be fixed too
    test_tmp_1.`author`.`email`ASauthor_name
    FROMtest_tmp_1;
    -----------------------
    -- 4. UNNEST ARRAY with nested ROW
    -----------------------
    CREATEEXTERNALTABLEtest_tmp(
    `ref`VARCHAR,
    `commits`ARRAY<ROW<
    `id`VARCHAR,
    `author`ROW<
    `name`VARCHAR,
    `email`VARCHAR
    >
    >>
    )
    TYPEtext
    LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
    TBLPROPERTIES '{"format":"json",
    "deadLetterFile":"python/dbt/tests/dead"}';
    SELECT
    test_tmp.`ref`ASbranch_name,
    commit_item.`id`AScommit_hash,
    commit_item.`author`.`name`ASauthor_name
    FROMtest_tmp
    CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
    -- Row expected 4 fields (Field{name=ref, description=,
    type=STRING, options={{}}}, Field{name=commits, description=,
    type=ARRAY<ROW<id STRING, author ROW<name STRING, email STRING>>
    NOT NULL>, options={{}}}, Field{name=id, description=,
    type=STRING, options={{}}}, Field{name=author, description=,
    type=ROW<name STRING, email STRING>, options={{}}}). initialized
    with 5 fields.
    -- limited WORKAROUND - refer to array elements by index and
    UNION ALL the items into rows
    -- note workaround that uses number table will not work as CROSS
    JOIN is not supported
    WITHdata_parsed AS(
    SELECT
    test_tmp.`ref`ASbranch_id,
    test_tmp.commits[1].`id`AScommit_hash,
    test_tmp.commits[1].`author`.`name`ASauthor_name
    FROMtest_tmp
    UNION ALL-- this unfortunately works only for two indexes
    SELECT
    test_tmp.`ref`ASbranch_id,
    test_tmp.commits[2].`id`AScommit_hash,
    test_tmp.commits[2].`author`.`name`ASauthor_name
    FROMtest_tmp
    )
    SELECT*
    FROMdata_parsed
    WHEREauthor_name IS NOT NULL
    ;
    -- better WORKAROUND - but tricky to get right (fragile)
    WITHdata_with_number_array AS(
    SELECT
    test_tmp.`ref`ASbranch_name, -- there must be some primary key
    in the data to join on later due to CROSS JOIN support limitation
    ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
    CARDINALITY(test_tmp.commits) AScommits_size
    FROMtest_tmp
    ),
    data_with_numbers AS(
    SELECT
    branch_name,
    `EXPR$0`ASnumber_item
    FROMdata_with_number_array
    CROSS JOINUNNEST(data_with_number_array.number_array) ASexploded
    WHERE`EXPR$0`<=commits_size
    ),
    data_exploded AS(
    SELECT
    test_tmp.`ref`ASbranch_name,
    test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
    test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
    FROMtest_tmp
    INNER JOINdata_with_numbers
    ONdata_with_numbers.branch_name =test_tmp.`ref`
    )
    SELECT
    branch_name,
    commit_hash,
    author_name
    FROMdata_exploded
    -- WHERE author_name IS NOT NULL - not possible here due to `Non
    equi-join is not supported`
    -- as it pushes this condition as predicate pushdown to join.
    -- Is there any way to force checking this condition on here and
    not to project it upstream?
    ;
    -----------------------
    -- 5. single UNION ALL possible
    -----------------------
    SELECT1ASa
    UNION ALL
    SELECT2ASa
    UNION ALL
    SELECT3ASa;
    -- Wrong number of arguments to BeamUnionRel:
    org.apache.beam.sdk.values.PCollectionList@70f145ac
    -----------------------
    -- 6. Reserved names
    -----------------------
    -- json_object
    SELECT'{}'ASjson_object;
    -- parse failed: Incorrect syntax near the keyword 'AS' at line
    1, column 13.
    -- WORKAROUND SELECT '{}' AS `json_object`
    -----------------------
    -- 7. Windowing function on stream
    -----------------------
    -- in detail - How to get previous message for a key?
    -- setting expiration arbitrary big is ok, but access to the
    previous record must happen fairly quickly
    -- not wait for the big window to finish and emit the expired keys.
    -- Ideally would like to do it in pure beam pipeline as saving
    to some external key/value store
    -- and then reading this here could potentially result in some
    race conditions which would be hard to debug.
    DROPTABLEIFEXISTSunbounded_stream;
    CREATEEXTERNALTABLEunbounded_stream(
    sequenceBIGINT,
    event_time TIMESTAMP
    )
    TYPE'sequence'
    TBLPROPERTIES '{"elementsPerSecond":1}'
    ;
    CREATEEXTERNALTABLEdata_1_bounded(
    `sequence_nb`BIGINT,
    `sender_login`VARCHAR,
    `user_id`VARCHAR
    )
    TYPEtext
    
LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
    TBLPROPERTIES '{"format":"json",
    "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
    ;
    WITH
    test_data_1_unbounded AS(
    SELECT
    sender_login,
    user_id,
    event_time
    FROMunbounded_stream
    INNER JOINdata_1_bounded
    ONunbounded_stream.sequence =data_1_bounded.sequence_nb
    ),
    test_data_1_lookbehind AS(
    SELECT
    sender_login,
    LAST_VALUE(user_id) OVERprevious_win ASuser_id
    FROMtest_data_1_unbounded
    WINDOWprevious_win AS(
    PARTITIONBYsender_login
    ORDER BYevent_time ASC
    ROWSBETWEEN1PRECEDINGAND1PRECEDING
    )
    )
    SELECT*
    FROMtest_data_1_lookbehind
    LIMIT8
    ;
    -- There are not enough rules to produce a node with desired
    properties: convention=ENUMERABLE. All the inputs have relevant
    nodes, however the cost is still infinite.
    -- Root: rel#29:RelSubset#4.ENUMERABLE
    -- Original rel:
    -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
    {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
    -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4)
    OVER (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount
    = 1.2, cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0
    io}, id = 14
    -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]):
    rowcount = 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io},
    id = 12
    -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount =
    1.0, cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
    -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount =
    8.0, cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
-- -- Sets:
    -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
    -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
    -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
    unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0
    cpu, 1.0 io}
    -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
    --
    rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
    rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
    1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
    -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR
    sender_login, VARCHAR user_id)
    -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
    -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
    data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows, 8.0
    cpu, 8.0 io}
    -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
    --
    rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
    rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
    1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
    -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6)
    event_time, BIGINT sequence_nb, VARCHAR sender_login, VARCHAR
    user_id)
    -- rel#21:RelSubset#2.NONE, best=null
    --
    rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
    $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
    -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4,
    $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
    --
    
rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
    rowcount=1.2, cumulative cost={inf}


Reply via email to