Hi,

BTW just found this on Calcite: https://calcite.apache.org/docs/stream.html#sliding-windows

I think this is precisely what I was trying to do with Beam SQL and the syntax is also very intuitive.

Could this be added to SQL roadmap? How hard it is for implementation?

Best

Piotr

On 31.05.2023 20:29, Kenneth Knowles wrote:
1. Yes, using state is a better fit than Beam windowing. You will want to use the new feature of annotating the DoFn with @RequiresTimeSortedInput. This will make it so you can be sure you are actually getting the "previous" event. They can arrive in any order without this annotation. You won't be able to do this in SQL. I don't think Beam SQL has implementations of analytic functions that have this ability.

Kenn

On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <contact.wisniowskipi...@gmail.com> wrote:

    Hi Kenn,

    Thanks for clarification.

    1. Just to put an example in front - for every event that comes in
    I need to find corresponding previous event of same user_id and
    pass previous_event_timestamp in the current event payload down
    (and also current event becomes previous event for future events
    that come in for same user). Question is how to do it with
    BeamSQL. I am aware that analytic windowing (like last_value over
    etc.) might not be a way for streaming and I am ok with this - it
    make sense under the hood just as You mention.

    The task is to be able to keep a simple state in streaming SQL.
    What I did come up with is using sliding window to have this state
    available for each new event that comes in.

    ```

    WITH
    unbounded_stream_initialized AS (
        SELECT
            user_id,
            event_time
        FROM unbounded_stream
        GROUP BY
            user_id,
            event_time,
            TUMBLE(event_time,INTERVAL '1' SECONDS)
        UNION ALL
        -- this is needed as first session window by default starts at
    first element, while here we need to start it in the past
        -- so that there is a window that ends just after first real
    element
        SELECT
            CAST(0 AS BIGINT) AS user_id,
            CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
        FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does
    not allow to have GROUP BY just after SELECT
        GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP),
    INTERVAL '1' SECONDS)
    ),
    test_data_1 AS (
        SELECT
            user_id,
            MAX(event_time) AS prev_event_time,
            HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7'
    DAYS) AS window_end_at
        FROM unbounded_stream_initialized
        GROUP BY
            user_id,
            HOP(
                -- first create a sliding window to aggregate state
                event_time,
                INTERVAL '1' SECONDS,
                INTERVAL '7' DAYS -- The idea is to have this quite
    long compared to interval
            )
    ),
    test_data_1_lookup AS (
        SELECT
            user_id,
            prev_event_time
        FROM test_data_1
        GROUP BY
            user_id,
            -- then re-window into windows suitable for joining main
    stream
            TUMBLE(window_end_at, INTERVAL '1' SECONDS)
    ),
    enriched_info AS (
        SELECT
            unbounded_stream_initialized.event_timestamp AS
    event_timestamp,
            unbounded_stream_initialized.user_id AS user_id,
            test_data_1_lookup.prev_event_time AS prev_event_time
        FROM unbounded_stream_initialized
        LEFT JOIN test_data_1_lookup
            ON unbounded_stream_initialized.user_id =
    test_data_1_lookup.user_id
    )
    SELECT
        *
    FROM enriched_info

    ```

    The doubt that I have is whether above will not store too much
    redundant data as `test_data_1` suggests it could duplicate and
    store each incoming msg into all windows there are in the sliding
    window definition (might be a lot in this case). Or actually
    resolving if a message belongs to a window is done later when
    evaluating `LEFT JOIN`? Target DataFlow. I am still learning Beam
    so there might be some core thing that I miss to understand how it
    is processed.

    2. Any hints on implementing FirestoreIOTableProvider? just more
    or less how to do it where to look for important parts etc. It
    seems we would need this functionality.

    3. I will try to report some more interesting findings. If
    possible please prioritize fixing this ROW error.

    Best

    Piotr

    On 26.05.2023 21:36, Kenneth Knowles wrote:
    Just want to clarify that Beam's concept of windowing is really
    an event-time based key, and they are all processed logically
    simultaneously. SQL's concept of windowing function is to sort
    rows and process them linearly. They are actually totally
    different. From your queries it seems you are interested in SQL's
    windowing functions (aka analytic functions).

    I am surprised by the problems with rows, since we have used them
    extensively. Hopefully it is not too hard to fix. Same with the
    UNION ALL problem.

    And for the CROSS JOIN it would be a nice feature to allow in
    some cases it seems. Should not be hard.

    Thank you for reporting this! If you have time it would be really
    great to get each of these reproducible problems into GitHub
    issues, each.

    Kenn

    On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr
    <contact.wisniowskipi...@gmail.com> wrote:

        Hi Alexey,

        Thank You for reference to that discussion I do actually have
        pretty similar thoughts on what Beam SQL needs.

        Update from my side:

        Actually did find a workaround for issue with windowing
        function on stream. It basically boils down to using sliding
        window to collect and aggregate the state. But would need an
        advice if this is actually a cost efficient method (targeting
        DataFlow runner). The doubt that I have is that this sliding
        window would need to have sliding interval less than 1s and
        size more than a week and be feed with quire frequent data.
        If I do understand this correctly - it would mean each input
        row would need to be duplicated for each window and stored
        which could be quite significant storage cost?

        Or actually Beam does not physically duplicate the record but
        just tracks to which windows the record currently belongs?


        And the real issue that BeamSQL needs at the moment in my
        opinion is fixing bugs.

        Some bugs that I found that prevent one from using it and
        would really appreciate fast fix:

        - UNNEST ARRAY with a nested ROW (described below, created
        ticket - https://github.com/apache/beam/issues/26911)

        - PubSub table provider actually requires all table
        properties to be there (with null in `timestampAttributeKey`
        it fails) - which essentially does not allow one to use
        pubsub publish timestamp as `timestampAttributeKey`.

        - its not possible to cast VARCHAR to BYTES. And BYTES is
        needed for DataStoreV1TableProvider to provide a key for
        storage. Also consider updating
        `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
        requires VARCHAR instead of BYTES - its even easier in
        implementation.

        - Any hints on how to implement `FireStoreIOTableProvider`? I
        am considering implementing it and contributing depending on
        my team decision - but would like to get like idea how hard
        this task is.

        Will create tickets for the rest of issues when I will have
        some spare time.

        Best regards

        Wiśniowski Piotr


        On 22.05.2023 18:28, Alexey Romanenko wrote:
        Hi Piotr,

        Thanks for details! I cross-post this to dev@ as well since,
        I guess, people there can provide more insights on this.

        A while ago, I faced the similar issues trying to run Beam
        SQL against TPC-DS benchmark.
        We had a discussion around that [1], please, take a look
        since it can be helpful.

        [1]
        https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b

        —
        Alexey

        On 18 May 2023, at 11:36, Wiśniowski Piotr
        <contact.wisniowskipi...@gmail.com>
        <mailto:contact.wisniowskipi...@gmail.com> wrote:

        HI,

        After experimenting with Beam SQL I did find some
        limitations. Testing on near latest main (precisely
        `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite,
        direct runner and openjdk version "11.0.19". Please let me
        know if some of them are known/ worked on/ have tickets or
        have estimated fix time. I believe most of them are low
        hanging fruits or just my thinking is not right for the
        problem. If this is the case please guide me to some
        working solution.

         From my perspective it is ok to have a fix just on master
        - no need to wait for release. Priority order:
        - 7. Windowing function on a stream - in detail - How to
        get previous message for a key? setting expiration
        arbitrary big is ok, but access to the previous record must
        happen fairly quickly not wait for the big window to finish
        and emit the expired keys. Ideally would like to do it in
        pure beam pipeline as saving to some external key/value
        store and then reading this here could potentially result
        in some race conditions which in I would like to avoid, but
        if its the only option - let it be.
        - 5. single UNION ALL possible
        - 4. UNNEST ARRAY with nested ROW
        - 3. Using * when there is Row type present in the schema
        - 1. `CROSS JOIN` between two unrelated tables is not
        supported - even if one is a static number table
        - 2. ROW construction not supported. It is not possible to
        nest data

        Below queries tat I use to testing this scenarios.

        Thank You for looking at this topics!

        Best

        Wiśniowski Piotr

        -----------------------
        -- 1. `CROSS JOIN` between two unrelated tables is not
        supported.
        -----------------------
        -- Only supported is `CROSS JOIN UNNEST` when exploding
        array from same table.
        -- It is not possible to number rows
        WITHdata_table AS(
        SELECT1ASa
        ),
        number_table AS(
        SELECT
        numbers_exploded ASnumber_item
        FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])
        ASnumbers_exploded
        )
        SELECT
        data_table.a,
        number_table.number_item
        FROMdata_table
        CROSS JOINnumber_table
        ;
        -- CROSS JOIN, JOIN ON FALSE is not supported!
        -----------------------
        -- 2. ROW construction not supported. It is not possible to
        nest data
        -----------------------
        SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException:
        EXPR$0
        SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
        SELECTMAP['field1',1,'field2','a']; -- Parameters must be
        of the same type
        SELECTMAP['field1','b','field2','a']; -- null
        -- WORKAROUND - manually compose json string,
        -- drawback - decomposing might be not supported or would
        need to be also based on string operations
        SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}')
        AS`json_object`;
        -----------------------
        -- 3. Using * when there is Row type present in the schema
        -----------------------
        CREATEEXTERNALTABLEtest_tmp_1(
        `ref`VARCHAR,
        `author`ROW<
        `name`VARCHAR,
        `email`VARCHAR
        >
        )
        TYPEtext
        LOCATION'python/dbt/tests/using_star_limitation.jsonl'
        TBLPROPERTIES '{"format":"json",
        "deadLetterFile":"top/python/dbt/tests/dead"}';
        SELECT*FROMtest_tmp_1;
        -- java.lang.NoSuchFieldException: name
        -- WORKAROUND - refer to columns explicitly with alias
        SELECT
        `ref`ASref_value,
        test_tmp_1.`author`.`name`ASauthor_name, -- table name must
        be referenced explicitly - this could be fixed too
        test_tmp_1.`author`.`email`ASauthor_name
        FROMtest_tmp_1;
        -----------------------
        -- 4. UNNEST ARRAY with nested ROW
        -----------------------
        CREATEEXTERNALTABLEtest_tmp(
        `ref`VARCHAR,
        `commits`ARRAY<ROW<
        `id`VARCHAR,
        `author`ROW<
        `name`VARCHAR,
        `email`VARCHAR
        >
        >>
        )
        TYPEtext
        LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
        TBLPROPERTIES '{"format":"json",
        "deadLetterFile":"python/dbt/tests/dead"}';
        SELECT
        test_tmp.`ref`ASbranch_name,
        commit_item.`id`AScommit_hash,
        commit_item.`author`.`name`ASauthor_name
        FROMtest_tmp
        CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
        -- Row expected 4 fields (Field{name=ref, description=,
        type=STRING, options={{}}}, Field{name=commits,
        description=, type=ARRAY<ROW<id STRING, author ROW<name
        STRING, email STRING>> NOT NULL>, options={{}}},
        Field{name=id, description=, type=STRING, options={{}}},
        Field{name=author, description=, type=ROW<name STRING,
        email STRING>, options={{}}}). initialized with 5 fields.
        -- limited WORKAROUND - refer to array elements by index
        and UNION ALL the items into rows
        -- note workaround that uses number table will not work as
        CROSS JOIN is not supported
        WITHdata_parsed AS(
        SELECT
        test_tmp.`ref`ASbranch_id,
        test_tmp.commits[1].`id`AScommit_hash,
        test_tmp.commits[1].`author`.`name`ASauthor_name
        FROMtest_tmp
        UNION ALL-- this unfortunately works only for two indexes
        SELECT
        test_tmp.`ref`ASbranch_id,
        test_tmp.commits[2].`id`AScommit_hash,
        test_tmp.commits[2].`author`.`name`ASauthor_name
        FROMtest_tmp
        )
        SELECT*
        FROMdata_parsed
        WHEREauthor_name IS NOT NULL
        ;
        -- better WORKAROUND - but tricky to get right (fragile)
        WITHdata_with_number_array AS(
        SELECT
        test_tmp.`ref`ASbranch_name, -- there must be some primary
        key in the data to join on later due to CROSS JOIN support
        limitation
        ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
        CARDINALITY(test_tmp.commits) AScommits_size
        FROMtest_tmp
        ),
        data_with_numbers AS(
        SELECT
        branch_name,
        `EXPR$0`ASnumber_item
        FROMdata_with_number_array
        CROSS JOINUNNEST(data_with_number_array.number_array)
        ASexploded
        WHERE`EXPR$0`<=commits_size
        ),
        data_exploded AS(
        SELECT
        test_tmp.`ref`ASbranch_name,
        test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
        
test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
        FROMtest_tmp
        INNER JOINdata_with_numbers
        ONdata_with_numbers.branch_name =test_tmp.`ref`
        )
        SELECT
        branch_name,
        commit_hash,
        author_name
        FROMdata_exploded
        -- WHERE author_name IS NOT NULL - not possible here due to
        `Non equi-join is not supported`
        -- as it pushes this condition as predicate pushdown to join.
        -- Is there any way to force checking this condition on
        here and not to project it upstream?
        ;
        -----------------------
        -- 5. single UNION ALL possible
        -----------------------
        SELECT1ASa
        UNION ALL
        SELECT2ASa
        UNION ALL
        SELECT3ASa;
        -- Wrong number of arguments to BeamUnionRel:
        org.apache.beam.sdk.values.PCollectionList@70f145ac
        -----------------------
        -- 6. Reserved names
        -----------------------
        -- json_object
        SELECT'{}'ASjson_object;
        -- parse failed: Incorrect syntax near the keyword 'AS' at
        line 1, column 13.
        -- WORKAROUND SELECT '{}' AS `json_object`
        -----------------------
        -- 7. Windowing function on stream
        -----------------------
        -- in detail - How to get previous message for a key?
        -- setting expiration arbitrary big is ok, but access to
        the previous record must happen fairly quickly
        -- not wait for the big window to finish and emit the
        expired keys.
        -- Ideally would like to do it in pure beam pipeline as
        saving to some external key/value store
        -- and then reading this here could potentially result in
        some race conditions which would be hard to debug.
        DROPTABLEIFEXISTSunbounded_stream;
        CREATEEXTERNALTABLEunbounded_stream(
        sequenceBIGINT,
        event_time TIMESTAMP
        )
        TYPE'sequence'
        TBLPROPERTIES '{"elementsPerSecond":1}'
        ;
        CREATEEXTERNALTABLEdata_1_bounded(
        `sequence_nb`BIGINT,
        `sender_login`VARCHAR,
        `user_id`VARCHAR
        )
        TYPEtext
        
LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
        TBLPROPERTIES '{"format":"json",
        "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
        ;
        WITH
        test_data_1_unbounded AS(
        SELECT
        sender_login,
        user_id,
        event_time
        FROMunbounded_stream
        INNER JOINdata_1_bounded
        ONunbounded_stream.sequence =data_1_bounded.sequence_nb
        ),
        test_data_1_lookbehind AS(
        SELECT
        sender_login,
        LAST_VALUE(user_id) OVERprevious_win ASuser_id
        FROMtest_data_1_unbounded
        WINDOWprevious_win AS(
        PARTITIONBYsender_login
        ORDER BYevent_time ASC
        ROWSBETWEEN1PRECEDINGAND1PRECEDING
        )
        )
        SELECT*
        FROMtest_data_1_lookbehind
        LIMIT8
        ;
        -- There are not enough rules to produce a node with
        desired properties: convention=ENUMERABLE. All the inputs
        have relevant nodes, however the cost is still infinite.
        -- Root: rel#29:RelSubset#4.ENUMERABLE
        -- Original rel:
        -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost
        = {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
        -- LogicalProject(sender_login=[$3],
        user_id=[LAST_VALUE($4) OVER (PARTITION BY $3 ORDER BY $1
        ROWS 1 PRECEDING)]): rowcount = 1.2, cumulative cost =
        {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
        -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]):
        rowcount = 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0
        io}, id = 12
        -- BeamIOSourceRel(table=[[beam, unbounded_stream]]):
        rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 1.0
        io}, id = 1
        -- BeamIOSourceRel(table=[[beam, data_1_bounded]]):
        rowcount = 8.0, cumulative cost = {8.0 rows, 8.0 cpu, 8.0
        io}, id = 3
-- -- Sets:
        -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6)
        event_time)
        -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
        -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
        unbounded_stream]), rowcount=1.0, cumulative cost={1.0
        rows, 1.0 cpu, 1.0 io}
        -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
        --
        rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
        rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
        1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
        -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR
        sender_login, VARCHAR user_id)
        -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
        -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
        data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows,
        8.0 cpu, 8.0 io}
        -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
        --
        rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
        rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
        1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
        -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6)
        event_time, BIGINT sequence_nb, VARCHAR sender_login,
        VARCHAR user_id)
        -- rel#21:RelSubset#2.NONE, best=null
        --
        
rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
        $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
        -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3,
        $4, $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
        --
        
rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
        rowcount=1.2, cumulative cost={inf}


Reply via email to