It would be interesting to see a design for this. You'll need to partition
or it won't scale because SQL "OVER" clause is linear and sorted in this
case. Other than that, it should be a pretty straightforward implementation
using state + timers + @RequiresTimeSortedInput. Sorting in any other way
would be a little more work, so I'd start with rejecting ORDER BY clauses
with other columns.

Kenn

On Fri, Jun 9, 2023 at 5:06 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi,
>
> BTW just found this on Calcite:
> https://calcite.apache.org/docs/stream.html#sliding-windows
>
> I think this is precisely what I was trying to do with Beam SQL and the
> syntax is also very intuitive.
>
> Could this be added to SQL roadmap? How hard it is for implementation?
>
> Best
>
> Piotr
> On 31.05.2023 20:29, Kenneth Knowles wrote:
>
> 1. Yes, using state is a better fit than Beam windowing. You will want to
> use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
> This will make it so you can be sure you are actually getting the
> "previous" event. They can arrive in any order without this annotation. You
> won't be able to do this in SQL. I don't think Beam SQL has implementations
> of analytic functions that have this ability.
>
> Kenn
>
> On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
> contact.wisniowskipi...@gmail.com> wrote:
>
>> Hi Kenn,
>>
>> Thanks for clarification.
>>
>> 1. Just to put an example in front - for every event that comes in I need
>> to find corresponding previous event of same user_id and pass
>> previous_event_timestamp in the current event payload down (and also
>> current event becomes previous event for future events that come in for
>> same user). Question is how to do it with BeamSQL. I am aware that analytic
>> windowing (like last_value over etc.) might not be a way for streaming and
>> I am ok with this - it make sense under the hood just as You mention.
>>
>> The task is to be able to keep a simple state in streaming SQL. What I
>> did come up with is using sliding window to have this state available for
>> each new event that comes in.
>>
>> ```
>>
>> WITH
>> unbounded_stream_initialized AS (
>>     SELECT
>>         user_id,
>>         event_time
>>     FROM unbounded_stream
>>     GROUP BY
>>         user_id,
>>         event_time,
>>         TUMBLE(event_time,INTERVAL '1' SECONDS)
>>     UNION ALL
>>     -- this is needed as first session window by default starts at first
>> element, while here we need to start it in the past
>>     -- so that there is a window that ends just after first real element
>>     SELECT
>>         CAST(0 AS BIGINT) AS user_id,
>>         CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
>>     FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not
>> allow to have GROUP BY just after SELECT
>>     GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL
>> '1' SECONDS)
>> ),
>> test_data_1 AS (
>>     SELECT
>>         user_id,
>>         MAX(event_time) AS prev_event_time,
>>         HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
>> window_end_at
>>     FROM unbounded_stream_initialized
>>     GROUP BY
>>         user_id,
>>         HOP(
>>             -- first create a sliding window to aggregate state
>>             event_time,
>>             INTERVAL '1' SECONDS,
>>             INTERVAL '7' DAYS -- The idea is to have this quite long
>> compared to interval
>>         )
>> ),
>> test_data_1_lookup AS (
>>     SELECT
>>         user_id,
>>         prev_event_time
>>     FROM test_data_1
>>     GROUP BY
>>         user_id,
>>         -- then re-window into windows suitable for joining main stream
>>         TUMBLE(window_end_at, INTERVAL '1' SECONDS)
>> ),
>> enriched_info AS (
>>     SELECT
>>         unbounded_stream_initialized.event_timestamp AS event_timestamp,
>>         unbounded_stream_initialized.user_id AS user_id,
>>         test_data_1_lookup.prev_event_time AS prev_event_time
>>     FROM unbounded_stream_initialized
>>     LEFT JOIN test_data_1_lookup
>>         ON unbounded_stream_initialized.user_id =
>> test_data_1_lookup.user_id
>> )
>> SELECT
>>     *
>> FROM enriched_info
>>
>> ```
>>
>> The doubt that I have is whether above will not store too much redundant
>> data as `test_data_1` suggests it could duplicate and store each incoming
>> msg into all windows there are in the sliding window definition (might be a
>> lot in this case). Or actually resolving if a message belongs to a window
>> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still
>> learning Beam so there might be some core thing that I miss to understand
>> how it is processed.
>>
>> 2. Any hints on implementing FirestoreIOTableProvider? just more or less
>> how to do it where to look for important parts etc. It seems we would need
>> this functionality.
>>
>> 3. I will try to report some more interesting findings. If possible
>> please prioritize fixing this ROW error.
>>
>> Best
>>
>> Piotr
>> On 26.05.2023 21:36, Kenneth Knowles wrote:
>>
>> Just want to clarify that Beam's concept of windowing is really an
>> event-time based key, and they are all processed logically simultaneously.
>> SQL's concept of windowing function is to sort rows and process them
>> linearly. They are actually totally different. From your queries it seems
>> you are interested in SQL's windowing functions (aka analytic functions).
>>
>> I am surprised by the problems with rows, since we have used them
>> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
>> problem.
>>
>> And for the CROSS JOIN it would be a nice feature to allow in some cases
>> it seems. Should not be hard.
>>
>> Thank you for reporting this! If you have time it would be really great
>> to get each of these reproducible problems into GitHub issues, each.
>>
>> Kenn
>>
>> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
>> contact.wisniowskipi...@gmail.com> wrote:
>>
>>> Hi Alexey,
>>>
>>> Thank You for reference to that discussion I do actually have pretty
>>> similar thoughts on what Beam SQL needs.
>>>
>>> Update from my side:
>>>
>>> Actually did find a workaround for issue with windowing function on
>>> stream. It basically boils down to using sliding window to collect and
>>> aggregate the state. But would need an advice if this is actually a cost
>>> efficient method (targeting DataFlow runner). The doubt that I have is that
>>> this sliding window would need to have sliding interval less than 1s and
>>> size more than a week and be feed with quire frequent data. If I do
>>> understand this correctly - it would mean each input row would need to be
>>> duplicated for each window and stored which could be quite significant
>>> storage cost?
>>>
>>> Or actually Beam does not physically duplicate the record but just
>>> tracks to which windows the record currently belongs?
>>>
>>>
>>> And the real issue that BeamSQL needs at the moment in my opinion is
>>> fixing bugs.
>>>
>>> Some bugs that I found that prevent one from using it and would really
>>> appreciate fast fix:
>>>
>>> - UNNEST ARRAY with a nested ROW (described below, created ticket -
>>> https://github.com/apache/beam/issues/26911)
>>>
>>> - PubSub table provider actually requires all table properties to be
>>> there (with null in `timestampAttributeKey` it fails) - which essentially
>>> does not allow one to use pubsub publish timestamp as
>>> `timestampAttributeKey`.
>>>
>>> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
>>> DataStoreV1TableProvider to provide a key for storage. Also consider
>>> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
>>> requires VARCHAR instead of BYTES - its even easier in implementation.
>>>
>>> - Any hints on how to implement `FireStoreIOTableProvider`? I am
>>> considering implementing it and contributing depending on my team decision
>>> - but would like to get like idea how hard this task is.
>>>
>>> Will create tickets for the rest of issues when I will have some spare
>>> time.
>>>
>>> Best regards
>>>
>>> Wiśniowski Piotr
>>>
>>>
>>> On 22.05.2023 18:28, Alexey Romanenko wrote:
>>>
>>> Hi Piotr,
>>>
>>> Thanks for details! I cross-post this to dev@ as well since, I guess,
>>> people there can provide more insights on this.
>>>
>>> A while ago, I faced the similar issues trying to run Beam SQL against
>>> TPC-DS benchmark.
>>> We had a discussion around that [1], please, take a look since it can be
>>> helpful.
>>>
>>> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>>>
>>> —
>>> Alexey
>>>
>>> On 18 May 2023, at 11:36, Wiśniowski Piotr
>>> <contact.wisniowskipi...@gmail.com> <contact.wisniowskipi...@gmail.com>
>>> wrote:
>>>
>>> HI,
>>>
>>> After experimenting with Beam SQL I did find some limitations. Testing
>>> on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
>>> with Calcite, direct runner and openjdk version "11.0.19". Please let me
>>> know if some of them are known/ worked on/ have tickets or have estimated
>>> fix time. I believe most of them are low hanging fruits or just my thinking
>>> is not right for the problem. If this is the case please guide me to some
>>> working solution.
>>>
>>>  From my perspective it is ok to have a fix just on master - no need to
>>> wait for release. Priority order:
>>> - 7. Windowing function on a stream - in detail - How to get previous
>>> message for a key? setting expiration arbitrary big is ok, but access to
>>> the previous record must happen fairly quickly not wait for the big window
>>> to finish and emit the expired keys. Ideally would like to do it in pure
>>> beam pipeline as saving to some external key/value store and then reading
>>> this here could potentially result in some race conditions which in I would
>>> like to avoid, but if its the only option - let it be.
>>> - 5. single UNION ALL possible
>>> - 4. UNNEST ARRAY with nested ROW
>>> - 3. Using * when there is Row type present in the schema
>>> - 1. `CROSS JOIN` between two unrelated tables is not supported - even
>>> if one is a static number table
>>> - 2. ROW construction not supported. It is not possible to nest data
>>>
>>> Below queries tat I use to testing this scenarios.
>>>
>>> Thank You for looking at this topics!
>>>
>>> Best
>>>
>>> Wiśniowski Piotr
>>> -----------------------
>>> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>>> -----------------------
>>> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same
>>> table.
>>> -- It is not possible to number rows
>>> WITH data_table AS (
>>> SELECT 1 AS a
>>> ),
>>> number_table AS (
>>> SELECT
>>> numbers_exploded AS number_item
>>> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS
>>> numbers_exploded
>>> )
>>> SELECT
>>> data_table.a,
>>> number_table.number_item
>>> FROM data_table
>>> CROSS JOIN number_table
>>> ;
>>> -- CROSS JOIN, JOIN ON FALSE is not supported!
>>> -----------------------
>>> -- 2. ROW construction not supported. It is not possible to nest data
>>> -----------------------
>>> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>>> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>>> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same
>>> type
>>> SELECT MAP['field1','b','field2','a']; -- null
>>> -- WORKAROUND - manually compose json string,
>>> -- drawback - decomposing might be not supported or would need to be
>>> also based on string operations
>>> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS
>>> `json_object`;
>>> -----------------------
>>> -- 3. Using * when there is Row type present in the schema
>>> -----------------------
>>> CREATE EXTERNAL TABLE test_tmp_1(
>>> `ref` VARCHAR,
>>> `author` ROW<
>>> `name` VARCHAR,
>>> `email` VARCHAR
>>> >
>>> )
>>> TYPE text
>>> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"top/python/dbt/tests/dead"}';
>>> SELECT * FROM test_tmp_1;
>>> -- java.lang.NoSuchFieldException: name
>>> -- WORKAROUND - refer to columns explicitly with alias
>>> SELECT
>>> `ref` AS ref_value,
>>> test_tmp_1.`author`.`name` AS author_name, -- table name must be
>>> referenced explicitly - this could be fixed too
>>> test_tmp_1.`author`.`email` AS author_name
>>> FROM test_tmp_1;
>>> -----------------------
>>> -- 4. UNNEST ARRAY with nested ROW
>>> -----------------------
>>> CREATE EXTERNAL TABLE test_tmp(
>>> `ref` VARCHAR,
>>> `commits` ARRAY<ROW<
>>> `id` VARCHAR,
>>> `author` ROW<
>>> `name` VARCHAR,
>>> `email` VARCHAR
>>> >
>>> >>
>>> )
>>> TYPE text
>>> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"python/dbt/tests/dead"}';
>>> SELECT
>>> test_tmp.`ref` AS branch_name,
>>> commit_item.`id` AS commit_hash,
>>> commit_item.`author`.`name` AS author_name
>>> FROM test_tmp
>>> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
>>> -- Row expected 4 fields (Field{name=ref, description=, type=STRING,
>>> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING,
>>> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}},
>>> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author,
>>> description=, type=ROW<name STRING, email STRING>, options={{}}}).
>>> initialized with 5 fields.
>>> -- limited WORKAROUND - refer to array elements by index and UNION ALL
>>> the items into rows
>>> -- note workaround that uses number table will not work as CROSS JOIN is
>>> not supported
>>> WITH data_parsed AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_id,
>>> test_tmp.commits[1].`id` AS commit_hash,
>>> test_tmp.commits[1].`author`.`name` AS author_name
>>> FROM test_tmp
>>> UNION ALL -- this unfortunately works only for two indexes
>>> SELECT
>>> test_tmp.`ref` AS branch_id,
>>> test_tmp.commits[2].`id` AS commit_hash,
>>> test_tmp.commits[2].`author`.`name` AS author_name
>>> FROM test_tmp
>>> )
>>> SELECT *
>>> FROM data_parsed
>>> WHERE author_name IS NOT NULL
>>> ;
>>> -- better WORKAROUND - but tricky to get right (fragile)
>>> WITH data_with_number_array AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_name, -- there must be some primary key in the
>>> data to join on later due to CROSS JOIN support limitation
>>> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
>>> CARDINALITY(test_tmp.commits) AS commits_size
>>> FROM test_tmp
>>> ),
>>> data_with_numbers AS (
>>> SELECT
>>> branch_name,
>>> `EXPR$0` AS number_item
>>> FROM data_with_number_array
>>> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
>>> WHERE `EXPR$0` <= commits_size
>>> ),
>>> data_exploded AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_name,
>>> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
>>> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS
>>> author_name
>>> FROM test_tmp
>>> INNER JOIN data_with_numbers
>>> ON data_with_numbers.branch_name = test_tmp.`ref`
>>> )
>>> SELECT
>>> branch_name,
>>> commit_hash,
>>> author_name
>>> FROM data_exploded
>>> -- WHERE author_name IS NOT NULL - not possible here due to `Non
>>> equi-join is not supported`
>>> -- as it pushes this condition as predicate pushdown to join.
>>> -- Is there any way to force checking this condition on here and not to
>>> project it upstream?
>>> ;
>>> -----------------------
>>> -- 5. single UNION ALL possible
>>> -----------------------
>>> SELECT 1 AS a
>>> UNION ALL
>>> SELECT 2 AS a
>>> UNION ALL
>>> SELECT 3 AS a;
>>> -- Wrong number of arguments to BeamUnionRel:
>>> org.apache.beam.sdk.values.PCollectionList@70f145ac
>>> -----------------------
>>> -- 6. Reserved names
>>> -----------------------
>>> -- json_object
>>> SELECT '{}' AS json_object;
>>> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1,
>>> column 13.
>>> -- WORKAROUND SELECT '{}' AS `json_object`
>>> -----------------------
>>> -- 7. Windowing function on stream
>>> -----------------------
>>> -- in detail - How to get previous message for a key?
>>> -- setting expiration arbitrary big is ok, but access to the previous
>>> record must happen fairly quickly
>>> -- not wait for the big window to finish and emit the expired keys.
>>> -- Ideally would like to do it in pure beam pipeline as saving to some
>>> external key/value store
>>> -- and then reading this here could potentially result in some race
>>> conditions which would be hard to debug.
>>> DROP TABLE IF EXISTS unbounded_stream;
>>> CREATE EXTERNAL TABLE unbounded_stream(
>>> sequence BIGINT,
>>> event_time TIMESTAMP
>>> )
>>> TYPE 'sequence'
>>> TBLPROPERTIES '{"elementsPerSecond":1}'
>>> ;
>>> CREATE EXTERNAL TABLE data_1_bounded(
>>> `sequence_nb` BIGINT,
>>> `sender_login` VARCHAR,
>>> `user_id` VARCHAR
>>> )
>>> TYPE text
>>> LOCATION
>>> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>>> ;
>>> WITH
>>> test_data_1_unbounded AS (
>>> SELECT
>>> sender_login,
>>> user_id,
>>> event_time
>>> FROM unbounded_stream
>>> INNER JOIN data_1_bounded
>>> ON unbounded_stream.sequence = data_1_bounded.sequence_nb
>>> ),
>>> test_data_1_lookbehind AS (
>>> SELECT
>>> sender_login,
>>> LAST_VALUE(user_id) OVER previous_win AS user_id
>>> FROM test_data_1_unbounded
>>> WINDOW previous_win AS (
>>> PARTITION BY sender_login
>>> ORDER BY event_time ASC
>>> ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
>>> )
>>> )
>>> SELECT *
>>> FROM test_data_1_lookbehind
>>> LIMIT 8
>>> ;
>>> -- There are not enough rules to produce a node with desired properties:
>>> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost
>>> is still infinite.
>>> -- Root: rel#29:RelSubset#4.ENUMERABLE
>>> -- Original rel:
>>> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
>>> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>>> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER
>>> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2,
>>> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
>>> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2,
>>> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
>>> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0,
>>> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>>> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0,
>>> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>>> --
>>> -- Sets:
>>> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>>> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>>> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]),
>>> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
>>> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>>> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
>>> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
>>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login,
>>> VARCHAR user_id)
>>> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>>> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]),
>>> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
>>> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>>> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
>>> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
>>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time,
>>> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
>>> -- rel#21:RelSubset#2.NONE, best=null
>>> --
>>> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
>>> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>>> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1,
>>> $2]), rowcount=1.2, cumulative cost={inf}
>>> --
>>> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>>> rowcount=1.2, cumulative cost={inf}
>>>
>>>
>>>
>>>

Reply via email to