HI,

After experimenting with Beam SQL I did find some limitations. Testing on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner and openjdk version "11.0.19". Please let me know if some of them are known/ worked on/ have tickets or have estimated fix time. I believe most of them are low hanging fruits or just my thinking is not right for the problem. If this is the case please guide me to some working solution.

 From my perspective it is ok to have a fix just on master - no need to wait for release. Priority order: - 7. Windowing function on a stream - in detail - How to get previous message for a key? setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly not wait for the big window to finish and emit the expired keys. Ideally would like to do it in pure beam pipeline as saving to some external key/value store and then reading this here could potentially result in some race conditions which in I would like to avoid, but if its the only option - let it be.
- 5. single UNION ALL possible
- 4. UNNEST ARRAY with nested ROW
- 3. Using * when there is Row type present in the schema
- 1. `CROSS JOIN` between two unrelated tables is not supported - even if one is a static number table
- 2. ROW construction not supported. It is not possible to nest data

Below queries tat I use to testing this scenarios.

Thank You for looking at this topics!

Best

Wiśniowski Piotr

-----------------------
-- 1. `CROSS JOIN` between two unrelated tables is not supported.
-----------------------
-- Only supported is `CROSS JOIN UNNEST` when exploding array from same table.
-- It is not possible to number rows
WITHdata_table AS(
SELECT1ASa
),
number_table AS(
SELECT
numbers_exploded ASnumber_item
FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) ASnumbers_exploded
)
SELECT
data_table.a,
number_table.number_item
FROMdata_table
CROSS JOINnumber_table
;
-- CROSS JOIN, JOIN ON FALSE is not supported!
-----------------------
-- 2. ROW construction not supported. It is not possible to nest data
-----------------------
SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same type
SELECTMAP['field1','b','field2','a']; -- null
-- WORKAROUND - manually compose json string,
-- drawback - decomposing might be not supported or would need to be also based on string operations
SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
-----------------------
-- 3. Using * when there is Row type present in the schema
-----------------------
CREATEEXTERNALTABLEtest_tmp_1(
`ref`VARCHAR,
`author`ROW<
`name`VARCHAR,
`email`VARCHAR

)
TYPEtext
LOCATION'python/dbt/tests/using_star_limitation.jsonl'
TBLPROPERTIES '{"format":"json", "deadLetterFile":"top/python/dbt/tests/dead"}';
SELECT*FROMtest_tmp_1;
-- java.lang.NoSuchFieldException: name
-- WORKAROUND - refer to columns explicitly with alias
SELECT
`ref`ASref_value,
test_tmp_1.`author`.`name`ASauthor_name, -- table name must be referenced explicitly - this could be fixed too
test_tmp_1.`author`.`email`ASauthor_name
FROMtest_tmp_1;
-----------------------
-- 4. UNNEST ARRAY with nested ROW
-----------------------
CREATEEXTERNALTABLEtest_tmp(
`ref`VARCHAR,
`commits`ARRAY<ROW<
`id`VARCHAR,
`author`ROW<
`name`VARCHAR,
`email`VARCHAR


)
TYPEtext
LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}';
SELECT
test_tmp.`ref`ASbranch_name,
commit_item.`id`AScommit_hash,
commit_item.`author`.`name`ASauthor_name
FROMtest_tmp
CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
-- Row expected 4 fields (Field{name=ref, description=, type=STRING, options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING, author ROW<name STRING, email STRING>> NOT NULL>, options={{}}}, Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, description=, type=ROW<name STRING, email STRING>, options={{}}}). initialized with 5 fields. -- limited WORKAROUND - refer to array elements by index and UNION ALL the items into rows -- note workaround that uses number table will not work as CROSS JOIN is not supported
WITHdata_parsed AS(
SELECT
test_tmp.`ref`ASbranch_id,
test_tmp.commits[1].`id`AScommit_hash,
test_tmp.commits[1].`author`.`name`ASauthor_name
FROMtest_tmp
UNION ALL-- this unfortunately works only for two indexes
SELECT
test_tmp.`ref`ASbranch_id,
test_tmp.commits[2].`id`AScommit_hash,
test_tmp.commits[2].`author`.`name`ASauthor_name
FROMtest_tmp
)
SELECT*
FROMdata_parsed
WHEREauthor_name IS NOT NULL
;
-- better WORKAROUND - but tricky to get right (fragile)
WITHdata_with_number_array AS(
SELECT
test_tmp.`ref`ASbranch_name, -- there must be some primary key in the data to join on later due to CROSS JOIN support limitation
ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
CARDINALITY(test_tmp.commits) AScommits_size
FROMtest_tmp
),
data_with_numbers AS(
SELECT
branch_name,
`EXPR$0`ASnumber_item
FROMdata_with_number_array
CROSS JOINUNNEST(data_with_number_array.number_array) ASexploded
WHERE`EXPR$0`<=commits_size
),
data_exploded AS(
SELECT
test_tmp.`ref`ASbranch_name,
test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
FROMtest_tmp
INNER JOINdata_with_numbers
ONdata_with_numbers.branch_name =test_tmp.`ref`
)
SELECT
branch_name,
commit_hash,
author_name
FROMdata_exploded
-- WHERE author_name IS NOT NULL - not possible here due to `Non equi-join is not supported`
-- as it pushes this condition as predicate pushdown to join.
-- Is there any way to force checking this condition on here and not to project it upstream?
;
-----------------------
-- 5. single UNION ALL possible
-----------------------
SELECT1ASa
UNION ALL
SELECT2ASa
UNION ALL
SELECT3ASa;
-- Wrong number of arguments to BeamUnionRel: org.apache.beam.sdk.values.PCollectionList@70f145ac
-----------------------
-- 6. Reserved names
-----------------------
-- json_object
SELECT'{}'ASjson_object;
-- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column 13.
-- WORKAROUND SELECT '{}' AS `json_object`
-----------------------
-- 7. Windowing function on stream
-----------------------
-- in detail - How to get previous message for a key?
-- setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly
-- not wait for the big window to finish and emit the expired keys.
-- Ideally would like to do it in pure beam pipeline as saving to some external key/value store -- and then reading this here could potentially result in some race conditions which would be hard to debug.
DROPTABLEIFEXISTSunbounded_stream;
CREATEEXTERNALTABLEunbounded_stream(
sequenceBIGINT,
event_time TIMESTAMP
)
TYPE'sequence'
TBLPROPERTIES '{"elementsPerSecond":1}'
;
CREATEEXTERNALTABLEdata_1_bounded(
`sequence_nb`BIGINT,
`sender_login`VARCHAR,
`user_id`VARCHAR
)
TYPEtext
LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
;
WITH
test_data_1_unbounded AS(
SELECT
sender_login,
user_id,
event_time
FROMunbounded_stream
INNER JOINdata_1_bounded
ONunbounded_stream.sequence =data_1_bounded.sequence_nb
),
test_data_1_lookbehind AS(
SELECT
sender_login,
LAST_VALUE(user_id) OVERprevious_win ASuser_id
FROMtest_data_1_unbounded
WINDOWprevious_win AS(
PARTITIONBYsender_login
ORDER BYevent_time ASC
ROWSBETWEEN1PRECEDINGAND1PRECEDING
)
)
SELECT*
FROMtest_data_1_lookbehind
LIMIT8
;
-- There are not enough rules to produce a node with desired properties: convention=ENUMERABLE. All the inputs have relevant nodes, however the cost is still infinite.
-- Root: rel#29:RelSubset#4.ENUMERABLE
-- Original rel:
-- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16 -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14 -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12 -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1 -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
--
-- Sets:
-- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
-- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
-- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
-- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
-- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), rowcount=1.0, cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
-- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
-- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
-- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
-- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), rowcount=8.0, cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
-- rel#21:RelSubset#2.NONE, best=null
--
rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, $2),joinType=inner), rowcount=1.2, cumulative cost={inf} -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
--
rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2), rowcount=1.2, cumulative cost={inf}

Reply via email to