Hi Piotr,

Thanks for details! I cross-post this to dev@ as well since, I guess, people 
there can provide more insights on this.

A while ago, I faced the similar issues trying to run Beam SQL against TPC-DS 
benchmark. 
We had a discussion around that [1], please, take a look since it can be 
helpful.

[1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b

—
Alexey 

> On 18 May 2023, at 11:36, Wiśniowski Piotr 
> <contact.wisniowskipi...@gmail.com> wrote:
> 
> HI,
> 
> After experimenting with Beam SQL I did find some limitations. Testing on 
> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with 
> Calcite, direct runner and openjdk version "11.0.19". Please let me know if 
> some of them are known/ worked on/ have tickets or have estimated fix time. I 
> believe most of them are low hanging fruits or just my thinking is not right 
> for the problem. If this is the case please guide me to some working solution.
> 
>  From my perspective it is ok to have a fix just on master - no need to wait 
> for release. Priority order: 
> - 7. Windowing function on a stream - in detail - How to get previous message 
> for a key? setting expiration arbitrary big is ok, but access to the previous 
> record must happen fairly quickly not wait for the big window to finish and 
> emit the expired keys. Ideally would like to do it in pure beam pipeline as 
> saving to some external key/value store and then reading this here could 
> potentially result in some race conditions which in I would like to avoid, 
> but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if one 
> is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
> 
> Below queries tat I use to testing this scenarios.
> 
> Thank You for looking at this topics!
> 
> Best
> 
> Wiśniowski Piotr
> 
> -----------------------
> -- 1. `CROSS JOIN` between two unrelated tables is not supported. 
> -----------------------
> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same table.
> -- It is not possible to number rows
> WITH data_table AS (
>     SELECT 1 AS a
> ),
> number_table AS (
>     SELECT 
>         numbers_exploded AS number_item
>     FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS 
> numbers_exploded
> )
> SELECT 
>     data_table.a,
>     number_table.number_item
> FROM data_table
> CROSS JOIN number_table
> ;
> -- CROSS JOIN, JOIN ON FALSE is not supported!
> 
> 
> -----------------------
> -- 2. ROW construction not supported. It is not possible to nest data
> -----------------------
> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 
> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 
> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same type
> SELECT MAP['field1','b','field2','a']; -- null
> -- WORKAROUND - manually compose json string, 
> -- drawback - decomposing might be not supported or would need to be also 
> based on string operations
> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS `json_object`;
> 
> 
> -----------------------
> -- 3. Using * when there is Row type present in the schema
> -----------------------
> CREATE EXTERNAL TABLE test_tmp_1(
>     `ref` VARCHAR,
>     `author` ROW<
>         `name` VARCHAR,
>         `email` VARCHAR
>     >
> )
> TYPE text
> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
> TBLPROPERTIES '{"format":"json", 
> "deadLetterFile":"top/python/dbt/tests/dead"}';
> SELECT * FROM test_tmp_1;
> --  java.lang.NoSuchFieldException: name 
> -- WORKAROUND - refer to columns explicitly with alias
> SELECT 
>     `ref` AS ref_value, 
>     test_tmp_1.`author`.`name` AS author_name, -- table name must be 
> referenced explicitly - this could be fixed too
>     test_tmp_1.`author`.`email` AS author_name
> FROM test_tmp_1;
> 
> 
> -----------------------
> -- 4. UNNEST ARRAY with nested ROW
> -----------------------
> CREATE EXTERNAL TABLE test_tmp(
>     `ref` VARCHAR,
>     `commits` ARRAY<ROW<
>         `id` VARCHAR,
>         `author` ROW<
>             `name` VARCHAR,
>             `email` VARCHAR
>         >
>     >>
> )
> TYPE text
> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
> TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}';
> SELECT
>     test_tmp.`ref` AS branch_name,
>     commit_item.`id` AS commit_hash,
>     commit_item.`author`.`name` AS author_name
> FROM test_tmp
> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
> -- Row expected 4 fields (Field{name=ref, description=, type=STRING, 
> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING, 
> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}}, 
> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, 
> description=, type=ROW<name STRING, email STRING>, options={{}}}). 
> initialized with 5 fields.
> -- limited WORKAROUND - refer to array elements by index and UNION ALL the 
> items into rows
> -- note workaround that uses number table will not work as CROSS JOIN is not 
> supported
> WITH data_parsed AS (
>     SELECT
>         test_tmp.`ref` AS branch_id,
>         test_tmp.commits[1].`id` AS commit_hash,
>         test_tmp.commits[1].`author`.`name` AS author_name
>     FROM test_tmp
>     UNION ALL -- this unfortunately works only for two indexes
>     SELECT
>         test_tmp.`ref` AS branch_id,
>         test_tmp.commits[2].`id` AS commit_hash,
>         test_tmp.commits[2].`author`.`name` AS author_name
>     FROM test_tmp
> )
> SELECT * 
> FROM data_parsed
> WHERE author_name IS NOT NULL
> ;
> -- better WORKAROUND - but tricky to get right (fragile)
> WITH data_with_number_array AS (
>     SELECT
>         test_tmp.`ref` AS branch_name, -- there must be some primary key in 
> the data to join on later due to CROSS JOIN support limitation
>         ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
>         CARDINALITY(test_tmp.commits) AS commits_size
>     FROM test_tmp
> ),
> data_with_numbers AS (
>     SELECT 
>         branch_name,
>         `EXPR$0` AS number_item
>     FROM data_with_number_array
>     CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
>     WHERE `EXPR$0` <= commits_size
> ),
> data_exploded AS (
>     SELECT 
>         test_tmp.`ref` AS branch_name,
>         test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
>         test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS 
> author_name
>     FROM test_tmp
>     INNER JOIN data_with_numbers
>         ON data_with_numbers.branch_name = test_tmp.`ref`
> )
> SELECT
>     branch_name,
>     commit_hash,
>     author_name
> FROM data_exploded
> -- WHERE author_name IS NOT NULL - not possible here due to `Non equi-join is 
> not supported`
> -- as it pushes this condition as predicate pushdown to join. 
> -- Is there any way to force checking this condition on here and not to 
> project it upstream?
> ;
> 
> 
> -----------------------
> -- 5. single UNION ALL possible
> -----------------------
> SELECT 1 AS a
> UNION ALL
> SELECT 2 AS a
> UNION ALL
> SELECT 3 AS a;
> -- Wrong number of arguments to BeamUnionRel: 
> org.apache.beam.sdk.values.PCollectionList@70f145ac
> 
> 
> -----------------------
> -- 6. Reserved names
> -----------------------
> -- json_object
> SELECT '{}' AS json_object; 
> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column 13.
> -- WORKAROUND SELECT '{}' AS `json_object`
> 
> -----------------------
> -- 7. Windowing function on stream
> -----------------------
> -- in detail - How to get previous message for a key?
> -- setting expiration arbitrary big is ok, but access to the previous record 
> must happen fairly quickly 
> -- not wait for the big window to finish and emit the expired keys. 
> -- Ideally would like to do it in pure beam pipeline as saving to some 
> external key/value store
> -- and then reading this here could potentially result in some race 
> conditions which would be hard to debug.
> DROP TABLE IF EXISTS unbounded_stream;
> CREATE EXTERNAL TABLE unbounded_stream(
>     sequence BIGINT,
>     event_time TIMESTAMP
> )
> TYPE 'sequence'
> TBLPROPERTIES '{"elementsPerSecond":1}'
> ;
> CREATE EXTERNAL TABLE data_1_bounded(
>     `sequence_nb` BIGINT,
>     `sender_login` VARCHAR,
>     `user_id` VARCHAR
> )
> TYPE text
> LOCATION 
> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
> TBLPROPERTIES '{"format":"json", 
> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}' 
> ;
> WITH
> test_data_1_unbounded AS (
>     SELECT 
>         sender_login,
>         user_id,
>         event_time
>     FROM unbounded_stream
>     INNER JOIN data_1_bounded
>        ON unbounded_stream.sequence = data_1_bounded.sequence_nb
> ),
> test_data_1_lookbehind AS (
>     SELECT 
>         sender_login,
>         LAST_VALUE(user_id) OVER previous_win AS user_id
>     FROM test_data_1_unbounded
>     WINDOW previous_win AS (
>         PARTITION BY sender_login
>         ORDER BY event_time ASC
>         ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
>     )
> )
> SELECT * 
> FROM test_data_1_lookbehind 
> LIMIT 8
> ;
> -- There are not enough rules to produce a node with desired properties: 
> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost 
> is still infinite.
> -- Root: rel#29:RelSubset#4.ENUMERABLE
> -- Original rel:
> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = 
> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
> --   LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER 
> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, cumulative 
> cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
> --     LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2, 
> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
> --       BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, 
> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
> --       BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, 
> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
> -- 
> -- Sets:
> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
> --         rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
> --                 rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, 
> unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
> --         rel#33:RelSubset#0.ENUMERABLE, best=rel#32
> --                 
> rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), rowcount=1.0, 
> cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 
> 1.7976931348623157E308 io}
> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, VARCHAR 
> user_id)
> --         rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
> --                 rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, 
> data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
> --         rel#36:RelSubset#1.ENUMERABLE, best=rel#35
> --                 
> rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), rowcount=8.0, 
> cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 
> 1.7976931348623157E308 io}
> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, BIGINT 
> sequence_nb, VARCHAR sender_login, VARCHAR user_id)
> --         rel#21:RelSubset#2.NONE, best=null
> --                 
> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, 
> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
> --                 rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, 
> $4, $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
> --                 
> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>  rowcount=1.2, cumulative cost={inf}
> 
> 
> 
> 
> 

Reply via email to