Hi Rui,
Thanks for this info. It’s good to know we are already supporting the window
function. But I still have a problem with the schema of the query result.
This is my code (with Beam 2.28):
Schema appSchema = Schema
.builder()
.addInt32Field("foo")
.addInt32Field("bar")
.build();
Row rowOne = Row.withSchema(appSchema).addValues(1, 1).build();
Row rowTwo = Row.withSchema(appSchema).addValues(1, 2).build();
PCollection<Row> inputRows = executionContext.getPipeline()
.apply(Create.of(rowOne, rowTwo))
.setRowSchema(appSchema);
String sql = "SELECT foo, bar, RANK() over (PARTITION BY foo ORDER BY
bar) AS agg FROM PCOLLECTION";
PCollection<Row> result = inputRows.apply("sql",
SqlTransform.query(sql));
I can see the expected data from result, but I don’t see “agg” column in the
schema. Do you have any ideas regarding this issue? Thanks!
The Beam schema of the result is:
Field{name=foo, description=, type=FieldType{typeName=INT32, nullable=false,
logicalType=null, collectionElementType=null, mapKeyType=null,
mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=bar, description=, type=FieldType{typeName=INT32, nullable=false,
logicalType=null, collectionElementType=null, mapKeyType=null,
mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=w0$o0, description=, type=FieldType{typeName=INT64, nullable=false,
logicalType=null, collectionElementType=null, mapKeyType=null,
mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Here are some detailed logs if they are helpful:
[main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL:
SELECT `PCOLLECTION`.`foo`, `PCOLLECTION`.`bar`, RANK() OVER (PARTITION BY
`PCOLLECTION`.`foo` ORDER BY `PCOLLECTION`.`bar`) AS `agg`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
[main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
SQLPlan>
LogicalProject(foo=[$0], bar=[$1], agg=[RANK() OVER (PARTITION BY $0 ORDER BY
$1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])
[main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
BEAMPlan>
BeamWindowRel(window#0=[window(partition {0} order by [1] range between
UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])
From: Rui Wang <[email protected]>
Date: Tuesday, March 2, 2021 at 10:43 AM
To: Tao Li <[email protected]>
Cc: "[email protected]" <[email protected]>
Subject: Re: Regarding the over window query support from Beam SQL
Hi Tao,
[1] contains what functions are working with OVER clause. Rank is one of the
functions that is supported. Can you take a look?
[1]:
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fextensions%2Fsql%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fextensions%2Fsql%2FBeamAnalyticFunctionsTest.java&data=04%7C01%7Ctaol%40zillow.com%7C18a004907e2549df27f908d8ddab09eb%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637503073974502000%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=2dcir%2FC4seUtnW0a0cwX5%2Bax%2FQQwoJkzGsdheTulN2A%3D&reserved=0>
-Rui
On Tue, Mar 2, 2021 at 9:24 AM Tao Li <[email protected]<mailto:[email protected]>>
wrote:
+ Rui Wang. Looks like Rui has been working on this jira.
From: Tao Li <[email protected]<mailto:[email protected]>>
Date: Monday, March 1, 2021 at 9:51 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Regarding the over window query support from Beam SQL
Hi Beam community,
Querying over a window for ranking etc is pretty common in SQL use cases. I
have found this jira
https://issues.apache.org/jira/browse/BEAM-9198<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-9198&data=04%7C01%7Ctaol%40zillow.com%7C18a004907e2549df27f908d8ddab09eb%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637503073974511958%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=EF9vIcAlS4niAEexfYW8Wf2TslCaQepKzZW9sts0qkg%3D&reserved=0>
Do we have a plan to support this? If there is no such plan in near future, are
Beam developers supposed to implement this function on their own (e.g. by using
GroupBy)? Thanks!