Hi Piotr, Thanks for details! I cross-post this to dev@ as well since, I guess, people there can provide more insights on this.
A while ago, I faced the similar issues trying to run Beam SQL against TPC-DS benchmark. We had a discussion around that [1], please, take a look since it can be helpful. [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b — Alexey > On 18 May 2023, at 11:36, Wiśniowski Piotr > <contact.wisniowskipi...@gmail.com> wrote: > > HI, > > After experimenting with Beam SQL I did find some limitations. Testing on > near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with > Calcite, direct runner and openjdk version "11.0.19". Please let me know if > some of them are known/ worked on/ have tickets or have estimated fix time. I > believe most of them are low hanging fruits or just my thinking is not right > for the problem. If this is the case please guide me to some working solution. > > From my perspective it is ok to have a fix just on master - no need to wait > for release. Priority order: > - 7. Windowing function on a stream - in detail - How to get previous message > for a key? setting expiration arbitrary big is ok, but access to the previous > record must happen fairly quickly not wait for the big window to finish and > emit the expired keys. Ideally would like to do it in pure beam pipeline as > saving to some external key/value store and then reading this here could > potentially result in some race conditions which in I would like to avoid, > but if its the only option - let it be. > - 5. single UNION ALL possible > - 4. UNNEST ARRAY with nested ROW > - 3. Using * when there is Row type present in the schema > - 1. `CROSS JOIN` between two unrelated tables is not supported - even if one > is a static number table > - 2. ROW construction not supported. It is not possible to nest data > > Below queries tat I use to testing this scenarios. > > Thank You for looking at this topics! > > Best > > Wiśniowski Piotr > > ----------------------- > -- 1. `CROSS JOIN` between two unrelated tables is not supported. > ----------------------- > -- Only supported is `CROSS JOIN UNNEST` when exploding array from same table. > -- It is not possible to number rows > WITH data_table AS ( > SELECT 1 AS a > ), > number_table AS ( > SELECT > numbers_exploded AS number_item > FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS > numbers_exploded > ) > SELECT > data_table.a, > number_table.number_item > FROM data_table > CROSS JOIN number_table > ; > -- CROSS JOIN, JOIN ON FALSE is not supported! > > > ----------------------- > -- 2. ROW construction not supported. It is not possible to nest data > ----------------------- > SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 > SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 > SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same type > SELECT MAP['field1','b','field2','a']; -- null > -- WORKAROUND - manually compose json string, > -- drawback - decomposing might be not supported or would need to be also > based on string operations > SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS `json_object`; > > > ----------------------- > -- 3. Using * when there is Row type present in the schema > ----------------------- > CREATE EXTERNAL TABLE test_tmp_1( > `ref` VARCHAR, > `author` ROW< > `name` VARCHAR, > `email` VARCHAR > > > ) > TYPE text > LOCATION 'python/dbt/tests/using_star_limitation.jsonl' > TBLPROPERTIES '{"format":"json", > "deadLetterFile":"top/python/dbt/tests/dead"}'; > SELECT * FROM test_tmp_1; > -- java.lang.NoSuchFieldException: name > -- WORKAROUND - refer to columns explicitly with alias > SELECT > `ref` AS ref_value, > test_tmp_1.`author`.`name` AS author_name, -- table name must be > referenced explicitly - this could be fixed too > test_tmp_1.`author`.`email` AS author_name > FROM test_tmp_1; > > > ----------------------- > -- 4. UNNEST ARRAY with nested ROW > ----------------------- > CREATE EXTERNAL TABLE test_tmp( > `ref` VARCHAR, > `commits` ARRAY<ROW< > `id` VARCHAR, > `author` ROW< > `name` VARCHAR, > `email` VARCHAR > > > >> > ) > TYPE text > LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl' > TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}'; > SELECT > test_tmp.`ref` AS branch_name, > commit_item.`id` AS commit_hash, > commit_item.`author`.`name` AS author_name > FROM test_tmp > CROSS JOIN UNNEST(test_tmp.commits) AS commit_item; > -- Row expected 4 fields (Field{name=ref, description=, type=STRING, > options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING, > author ROW<name STRING, email STRING>> NOT NULL>, options={{}}}, > Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, > description=, type=ROW<name STRING, email STRING>, options={{}}}). > initialized with 5 fields. > -- limited WORKAROUND - refer to array elements by index and UNION ALL the > items into rows > -- note workaround that uses number table will not work as CROSS JOIN is not > supported > WITH data_parsed AS ( > SELECT > test_tmp.`ref` AS branch_id, > test_tmp.commits[1].`id` AS commit_hash, > test_tmp.commits[1].`author`.`name` AS author_name > FROM test_tmp > UNION ALL -- this unfortunately works only for two indexes > SELECT > test_tmp.`ref` AS branch_id, > test_tmp.commits[2].`id` AS commit_hash, > test_tmp.commits[2].`author`.`name` AS author_name > FROM test_tmp > ) > SELECT * > FROM data_parsed > WHERE author_name IS NOT NULL > ; > -- better WORKAROUND - but tricky to get right (fragile) > WITH data_with_number_array AS ( > SELECT > test_tmp.`ref` AS branch_name, -- there must be some primary key in > the data to join on later due to CROSS JOIN support limitation > ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array, > CARDINALITY(test_tmp.commits) AS commits_size > FROM test_tmp > ), > data_with_numbers AS ( > SELECT > branch_name, > `EXPR$0` AS number_item > FROM data_with_number_array > CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded > WHERE `EXPR$0` <= commits_size > ), > data_exploded AS ( > SELECT > test_tmp.`ref` AS branch_name, > test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash, > test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS > author_name > FROM test_tmp > INNER JOIN data_with_numbers > ON data_with_numbers.branch_name = test_tmp.`ref` > ) > SELECT > branch_name, > commit_hash, > author_name > FROM data_exploded > -- WHERE author_name IS NOT NULL - not possible here due to `Non equi-join is > not supported` > -- as it pushes this condition as predicate pushdown to join. > -- Is there any way to force checking this condition on here and not to > project it upstream? > ; > > > ----------------------- > -- 5. single UNION ALL possible > ----------------------- > SELECT 1 AS a > UNION ALL > SELECT 2 AS a > UNION ALL > SELECT 3 AS a; > -- Wrong number of arguments to BeamUnionRel: > org.apache.beam.sdk.values.PCollectionList@70f145ac > > > ----------------------- > -- 6. Reserved names > ----------------------- > -- json_object > SELECT '{}' AS json_object; > -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column 13. > -- WORKAROUND SELECT '{}' AS `json_object` > > ----------------------- > -- 7. Windowing function on stream > ----------------------- > -- in detail - How to get previous message for a key? > -- setting expiration arbitrary big is ok, but access to the previous record > must happen fairly quickly > -- not wait for the big window to finish and emit the expired keys. > -- Ideally would like to do it in pure beam pipeline as saving to some > external key/value store > -- and then reading this here could potentially result in some race > conditions which would be hard to debug. > DROP TABLE IF EXISTS unbounded_stream; > CREATE EXTERNAL TABLE unbounded_stream( > sequence BIGINT, > event_time TIMESTAMP > ) > TYPE 'sequence' > TBLPROPERTIES '{"elementsPerSecond":1}' > ; > CREATE EXTERNAL TABLE data_1_bounded( > `sequence_nb` BIGINT, > `sender_login` VARCHAR, > `user_id` VARCHAR > ) > TYPE text > LOCATION > 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl' > TBLPROPERTIES '{"format":"json", > "deadLetterFile":"python/dbt/tests/dead_letters/dead"}' > ; > WITH > test_data_1_unbounded AS ( > SELECT > sender_login, > user_id, > event_time > FROM unbounded_stream > INNER JOIN data_1_bounded > ON unbounded_stream.sequence = data_1_bounded.sequence_nb > ), > test_data_1_lookbehind AS ( > SELECT > sender_login, > LAST_VALUE(user_id) OVER previous_win AS user_id > FROM test_data_1_unbounded > WINDOW previous_win AS ( > PARTITION BY sender_login > ORDER BY event_time ASC > ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING > ) > ) > SELECT * > FROM test_data_1_lookbehind > LIMIT 8 > ; > -- There are not enough rules to produce a node with desired properties: > convention=ENUMERABLE. All the inputs have relevant nodes, however the cost > is still infinite. > -- Root: rel#29:RelSubset#4.ENUMERABLE > -- Original rel: > -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = > {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16 > -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER > (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, cumulative > cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14 > -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2, > cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12 > -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, > cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1 > -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, > cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3 > -- > -- Sets: > -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time) > -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1 > -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io} > -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32 > -- > rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), rowcount=1.0, > cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, > 1.7976931348623157E308 io} > -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, VARCHAR > user_id) > -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3 > -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, > data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io} > -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35 > -- > rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), rowcount=8.0, > cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, > 1.7976931348623157E308 io} > -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, BIGINT > sequence_nb, VARCHAR sender_login, VARCHAR user_id) > -- rel#21:RelSubset#2.NONE, best=null > -- > rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, > $2),joinType=inner), rowcount=1.2, cumulative cost={inf} > -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, > $4, $0, $1, $2]), rowcount=1.2, cumulative cost={inf} > -- > rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2), > rowcount=1.2, cumulative cost={inf} > > > > >