It would be interesting to see a design for this. You'll need to partition or it won't scale because SQL "OVER" clause is linear and sorted in this case. Other than that, it should be a pretty straightforward implementation using state + timers + @RequiresTimeSortedInput. Sorting in any other way would be a little more work, so I'd start with rejecting ORDER BY clauses with other columns.
Kenn On Fri, Jun 9, 2023 at 5:06 AM Wiśniowski Piotr < contact.wisniowskipi...@gmail.com> wrote: > Hi, > > BTW just found this on Calcite: > https://calcite.apache.org/docs/stream.html#sliding-windows > > I think this is precisely what I was trying to do with Beam SQL and the > syntax is also very intuitive. > > Could this be added to SQL roadmap? How hard it is for implementation? > > Best > > Piotr > On 31.05.2023 20:29, Kenneth Knowles wrote: > > 1. Yes, using state is a better fit than Beam windowing. You will want to > use the new feature of annotating the DoFn with @RequiresTimeSortedInput. > This will make it so you can be sure you are actually getting the > "previous" event. They can arrive in any order without this annotation. You > won't be able to do this in SQL. I don't think Beam SQL has implementations > of analytic functions that have this ability. > > Kenn > > On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr < > contact.wisniowskipi...@gmail.com> wrote: > >> Hi Kenn, >> >> Thanks for clarification. >> >> 1. Just to put an example in front - for every event that comes in I need >> to find corresponding previous event of same user_id and pass >> previous_event_timestamp in the current event payload down (and also >> current event becomes previous event for future events that come in for >> same user). Question is how to do it with BeamSQL. I am aware that analytic >> windowing (like last_value over etc.) might not be a way for streaming and >> I am ok with this - it make sense under the hood just as You mention. >> >> The task is to be able to keep a simple state in streaming SQL. What I >> did come up with is using sliding window to have this state available for >> each new event that comes in. >> >> ``` >> >> WITH >> unbounded_stream_initialized AS ( >> SELECT >> user_id, >> event_time >> FROM unbounded_stream >> GROUP BY >> user_id, >> event_time, >> TUMBLE(event_time,INTERVAL '1' SECONDS) >> UNION ALL >> -- this is needed as first session window by default starts at first >> element, while here we need to start it in the past >> -- so that there is a window that ends just after first real element >> SELECT >> CAST(0 AS BIGINT) AS user_id, >> CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time >> FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not >> allow to have GROUP BY just after SELECT >> GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL >> '1' SECONDS) >> ), >> test_data_1 AS ( >> SELECT >> user_id, >> MAX(event_time) AS prev_event_time, >> HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS >> window_end_at >> FROM unbounded_stream_initialized >> GROUP BY >> user_id, >> HOP( >> -- first create a sliding window to aggregate state >> event_time, >> INTERVAL '1' SECONDS, >> INTERVAL '7' DAYS -- The idea is to have this quite long >> compared to interval >> ) >> ), >> test_data_1_lookup AS ( >> SELECT >> user_id, >> prev_event_time >> FROM test_data_1 >> GROUP BY >> user_id, >> -- then re-window into windows suitable for joining main stream >> TUMBLE(window_end_at, INTERVAL '1' SECONDS) >> ), >> enriched_info AS ( >> SELECT >> unbounded_stream_initialized.event_timestamp AS event_timestamp, >> unbounded_stream_initialized.user_id AS user_id, >> test_data_1_lookup.prev_event_time AS prev_event_time >> FROM unbounded_stream_initialized >> LEFT JOIN test_data_1_lookup >> ON unbounded_stream_initialized.user_id = >> test_data_1_lookup.user_id >> ) >> SELECT >> * >> FROM enriched_info >> >> ``` >> >> The doubt that I have is whether above will not store too much redundant >> data as `test_data_1` suggests it could duplicate and store each incoming >> msg into all windows there are in the sliding window definition (might be a >> lot in this case). Or actually resolving if a message belongs to a window >> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still >> learning Beam so there might be some core thing that I miss to understand >> how it is processed. >> >> 2. Any hints on implementing FirestoreIOTableProvider? just more or less >> how to do it where to look for important parts etc. It seems we would need >> this functionality. >> >> 3. I will try to report some more interesting findings. If possible >> please prioritize fixing this ROW error. >> >> Best >> >> Piotr >> On 26.05.2023 21:36, Kenneth Knowles wrote: >> >> Just want to clarify that Beam's concept of windowing is really an >> event-time based key, and they are all processed logically simultaneously. >> SQL's concept of windowing function is to sort rows and process them >> linearly. They are actually totally different. From your queries it seems >> you are interested in SQL's windowing functions (aka analytic functions). >> >> I am surprised by the problems with rows, since we have used them >> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL >> problem. >> >> And for the CROSS JOIN it would be a nice feature to allow in some cases >> it seems. Should not be hard. >> >> Thank you for reporting this! If you have time it would be really great >> to get each of these reproducible problems into GitHub issues, each. >> >> Kenn >> >> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr < >> contact.wisniowskipi...@gmail.com> wrote: >> >>> Hi Alexey, >>> >>> Thank You for reference to that discussion I do actually have pretty >>> similar thoughts on what Beam SQL needs. >>> >>> Update from my side: >>> >>> Actually did find a workaround for issue with windowing function on >>> stream. It basically boils down to using sliding window to collect and >>> aggregate the state. But would need an advice if this is actually a cost >>> efficient method (targeting DataFlow runner). The doubt that I have is that >>> this sliding window would need to have sliding interval less than 1s and >>> size more than a week and be feed with quire frequent data. If I do >>> understand this correctly - it would mean each input row would need to be >>> duplicated for each window and stored which could be quite significant >>> storage cost? >>> >>> Or actually Beam does not physically duplicate the record but just >>> tracks to which windows the record currently belongs? >>> >>> >>> And the real issue that BeamSQL needs at the moment in my opinion is >>> fixing bugs. >>> >>> Some bugs that I found that prevent one from using it and would really >>> appreciate fast fix: >>> >>> - UNNEST ARRAY with a nested ROW (described below, created ticket - >>> https://github.com/apache/beam/issues/26911) >>> >>> - PubSub table provider actually requires all table properties to be >>> there (with null in `timestampAttributeKey` it fails) - which essentially >>> does not allow one to use pubsub publish timestamp as >>> `timestampAttributeKey`. >>> >>> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for >>> DataStoreV1TableProvider to provide a key for storage. Also consider >>> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it >>> requires VARCHAR instead of BYTES - its even easier in implementation. >>> >>> - Any hints on how to implement `FireStoreIOTableProvider`? I am >>> considering implementing it and contributing depending on my team decision >>> - but would like to get like idea how hard this task is. >>> >>> Will create tickets for the rest of issues when I will have some spare >>> time. >>> >>> Best regards >>> >>> Wiśniowski Piotr >>> >>> >>> On 22.05.2023 18:28, Alexey Romanenko wrote: >>> >>> Hi Piotr, >>> >>> Thanks for details! I cross-post this to dev@ as well since, I guess, >>> people there can provide more insights on this. >>> >>> A while ago, I faced the similar issues trying to run Beam SQL against >>> TPC-DS benchmark. >>> We had a discussion around that [1], please, take a look since it can be >>> helpful. >>> >>> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b >>> >>> — >>> Alexey >>> >>> On 18 May 2023, at 11:36, Wiśniowski Piotr >>> <contact.wisniowskipi...@gmail.com> <contact.wisniowskipi...@gmail.com> >>> wrote: >>> >>> HI, >>> >>> After experimenting with Beam SQL I did find some limitations. Testing >>> on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) >>> with Calcite, direct runner and openjdk version "11.0.19". Please let me >>> know if some of them are known/ worked on/ have tickets or have estimated >>> fix time. I believe most of them are low hanging fruits or just my thinking >>> is not right for the problem. If this is the case please guide me to some >>> working solution. >>> >>> From my perspective it is ok to have a fix just on master - no need to >>> wait for release. Priority order: >>> - 7. Windowing function on a stream - in detail - How to get previous >>> message for a key? setting expiration arbitrary big is ok, but access to >>> the previous record must happen fairly quickly not wait for the big window >>> to finish and emit the expired keys. Ideally would like to do it in pure >>> beam pipeline as saving to some external key/value store and then reading >>> this here could potentially result in some race conditions which in I would >>> like to avoid, but if its the only option - let it be. >>> - 5. single UNION ALL possible >>> - 4. UNNEST ARRAY with nested ROW >>> - 3. Using * when there is Row type present in the schema >>> - 1. `CROSS JOIN` between two unrelated tables is not supported - even >>> if one is a static number table >>> - 2. ROW construction not supported. It is not possible to nest data >>> >>> Below queries tat I use to testing this scenarios. >>> >>> Thank You for looking at this topics! >>> >>> Best >>> >>> Wiśniowski Piotr >>> ----------------------- >>> -- 1. `CROSS JOIN` between two unrelated tables is not supported. >>> ----------------------- >>> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same >>> table. >>> -- It is not possible to number rows >>> WITH data_table AS ( >>> SELECT 1 AS a >>> ), >>> number_table AS ( >>> SELECT >>> numbers_exploded AS number_item >>> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS >>> numbers_exploded >>> ) >>> SELECT >>> data_table.a, >>> number_table.number_item >>> FROM data_table >>> CROSS JOIN number_table >>> ; >>> -- CROSS JOIN, JOIN ON FALSE is not supported! >>> ----------------------- >>> -- 2. ROW construction not supported. It is not possible to nest data >>> ----------------------- >>> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 >>> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 >>> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same >>> type >>> SELECT MAP['field1','b','field2','a']; -- null >>> -- WORKAROUND - manually compose json string, >>> -- drawback - decomposing might be not supported or would need to be >>> also based on string operations >>> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS >>> `json_object`; >>> ----------------------- >>> -- 3. Using * when there is Row type present in the schema >>> ----------------------- >>> CREATE EXTERNAL TABLE test_tmp_1( >>> `ref` VARCHAR, >>> `author` ROW< >>> `name` VARCHAR, >>> `email` VARCHAR >>> > >>> ) >>> TYPE text >>> LOCATION 'python/dbt/tests/using_star_limitation.jsonl' >>> TBLPROPERTIES '{"format":"json", >>> "deadLetterFile":"top/python/dbt/tests/dead"}'; >>> SELECT * FROM test_tmp_1; >>> -- java.lang.NoSuchFieldException: name >>> -- WORKAROUND - refer to columns explicitly with alias >>> SELECT >>> `ref` AS ref_value, >>> test_tmp_1.`author`.`name` AS author_name, -- table name must be >>> referenced explicitly - this could be fixed too >>> test_tmp_1.`author`.`email` AS author_name >>> FROM test_tmp_1; >>> ----------------------- >>> -- 4. UNNEST ARRAY with nested ROW >>> ----------------------- >>> CREATE EXTERNAL TABLE test_tmp( >>> `ref` VARCHAR, >>> `commits` ARRAY<ROW< >>> `id` VARCHAR, >>> `author` ROW< >>> `name` VARCHAR, >>> `email` VARCHAR >>> > >>> >> >>> ) >>> TYPE text >>> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl' >>> TBLPROPERTIES '{"format":"json", >>> "deadLetterFile":"python/dbt/tests/dead"}'; >>> SELECT >>> test_tmp.`ref` AS branch_name, >>> commit_item.`id` AS commit_hash, >>> commit_item.`author`.`name` AS author_name >>> FROM test_tmp >>> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item; >>> -- Row expected 4 fields (Field{name=ref, description=, type=STRING, >>> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING, >>> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}}, >>> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, >>> description=, type=ROW<name STRING, email STRING>, options={{}}}). >>> initialized with 5 fields. >>> -- limited WORKAROUND - refer to array elements by index and UNION ALL >>> the items into rows >>> -- note workaround that uses number table will not work as CROSS JOIN is >>> not supported >>> WITH data_parsed AS ( >>> SELECT >>> test_tmp.`ref` AS branch_id, >>> test_tmp.commits[1].`id` AS commit_hash, >>> test_tmp.commits[1].`author`.`name` AS author_name >>> FROM test_tmp >>> UNION ALL -- this unfortunately works only for two indexes >>> SELECT >>> test_tmp.`ref` AS branch_id, >>> test_tmp.commits[2].`id` AS commit_hash, >>> test_tmp.commits[2].`author`.`name` AS author_name >>> FROM test_tmp >>> ) >>> SELECT * >>> FROM data_parsed >>> WHERE author_name IS NOT NULL >>> ; >>> -- better WORKAROUND - but tricky to get right (fragile) >>> WITH data_with_number_array AS ( >>> SELECT >>> test_tmp.`ref` AS branch_name, -- there must be some primary key in the >>> data to join on later due to CROSS JOIN support limitation >>> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array, >>> CARDINALITY(test_tmp.commits) AS commits_size >>> FROM test_tmp >>> ), >>> data_with_numbers AS ( >>> SELECT >>> branch_name, >>> `EXPR$0` AS number_item >>> FROM data_with_number_array >>> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded >>> WHERE `EXPR$0` <= commits_size >>> ), >>> data_exploded AS ( >>> SELECT >>> test_tmp.`ref` AS branch_name, >>> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash, >>> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS >>> author_name >>> FROM test_tmp >>> INNER JOIN data_with_numbers >>> ON data_with_numbers.branch_name = test_tmp.`ref` >>> ) >>> SELECT >>> branch_name, >>> commit_hash, >>> author_name >>> FROM data_exploded >>> -- WHERE author_name IS NOT NULL - not possible here due to `Non >>> equi-join is not supported` >>> -- as it pushes this condition as predicate pushdown to join. >>> -- Is there any way to force checking this condition on here and not to >>> project it upstream? >>> ; >>> ----------------------- >>> -- 5. single UNION ALL possible >>> ----------------------- >>> SELECT 1 AS a >>> UNION ALL >>> SELECT 2 AS a >>> UNION ALL >>> SELECT 3 AS a; >>> -- Wrong number of arguments to BeamUnionRel: >>> org.apache.beam.sdk.values.PCollectionList@70f145ac >>> ----------------------- >>> -- 6. Reserved names >>> ----------------------- >>> -- json_object >>> SELECT '{}' AS json_object; >>> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, >>> column 13. >>> -- WORKAROUND SELECT '{}' AS `json_object` >>> ----------------------- >>> -- 7. Windowing function on stream >>> ----------------------- >>> -- in detail - How to get previous message for a key? >>> -- setting expiration arbitrary big is ok, but access to the previous >>> record must happen fairly quickly >>> -- not wait for the big window to finish and emit the expired keys. >>> -- Ideally would like to do it in pure beam pipeline as saving to some >>> external key/value store >>> -- and then reading this here could potentially result in some race >>> conditions which would be hard to debug. >>> DROP TABLE IF EXISTS unbounded_stream; >>> CREATE EXTERNAL TABLE unbounded_stream( >>> sequence BIGINT, >>> event_time TIMESTAMP >>> ) >>> TYPE 'sequence' >>> TBLPROPERTIES '{"elementsPerSecond":1}' >>> ; >>> CREATE EXTERNAL TABLE data_1_bounded( >>> `sequence_nb` BIGINT, >>> `sender_login` VARCHAR, >>> `user_id` VARCHAR >>> ) >>> TYPE text >>> LOCATION >>> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl' >>> TBLPROPERTIES '{"format":"json", >>> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}' >>> ; >>> WITH >>> test_data_1_unbounded AS ( >>> SELECT >>> sender_login, >>> user_id, >>> event_time >>> FROM unbounded_stream >>> INNER JOIN data_1_bounded >>> ON unbounded_stream.sequence = data_1_bounded.sequence_nb >>> ), >>> test_data_1_lookbehind AS ( >>> SELECT >>> sender_login, >>> LAST_VALUE(user_id) OVER previous_win AS user_id >>> FROM test_data_1_unbounded >>> WINDOW previous_win AS ( >>> PARTITION BY sender_login >>> ORDER BY event_time ASC >>> ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING >>> ) >>> ) >>> SELECT * >>> FROM test_data_1_lookbehind >>> LIMIT 8 >>> ; >>> -- There are not enough rules to produce a node with desired properties: >>> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost >>> is still infinite. >>> -- Root: rel#29:RelSubset#4.ENUMERABLE >>> -- Original rel: >>> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = >>> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16 >>> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER >>> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, >>> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14 >>> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2, >>> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12 >>> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, >>> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1 >>> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, >>> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3 >>> -- >>> -- Sets: >>> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time) >>> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1 >>> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]), >>> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io} >>> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32 >>> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), >>> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows, >>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} >>> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, >>> VARCHAR user_id) >>> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3 >>> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]), >>> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io} >>> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35 >>> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), >>> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows, >>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} >>> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, >>> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id) >>> -- rel#21:RelSubset#2.NONE, best=null >>> -- >>> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, >>> $2),joinType=inner), rowcount=1.2, cumulative cost={inf} >>> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1, >>> $2]), rowcount=1.2, cumulative cost={inf} >>> -- >>> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2), >>> rowcount=1.2, cumulative cost={inf} >>> >>> >>> >>>