Hi Devs,
I am trying to test the COVAR_POP aggregate function for the ZetaSql
dialect. I see
https://github.com/apache/beam/blob/b74fcf7b30d956fb42830d652a57b265a1546973/sdks/[…]he/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java
<https://github.com/apache/beam/blob/b74fcf7b30d956fb42830d652a57b265a1546973/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java#L50>
is
implemented as CombineFn and it works correctly
https://github.com/apache/beam/blob/befcc3d780d561e81f23512742862a65c0ae3b69/sdks/[…]eam/sdk/extensions/sql/BeamSqlDslAggregationCovarianceTest.java
<https://github.com/apache/beam/blob/befcc3d780d561e81f23512742862a65c0ae3b69/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationCovarianceTest.java#L87>
.However, for ZetaSql dialect, it throws:
covar_pop has more than one argument.
java.lang.IllegalArgumentException: covar_pop has more than one argument.Unit
test:
public void testZetaSqlCovarPop() {
String sql = "SELECT COVAR_POP(row_id,int64_col) FROM
table_all_types GROUP BY bool_col";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
final Schema schema = Schema.builder().addDoubleField("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue(-1.00000).build(),
Row.withSchema(schema).addValue(-1.55556).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
Can anybody help me in understanding the cause of this problem? I do not
understand how it works correctly in other places and not in ZetaSqlDialect.
For reference: https://github.com/apache/beam/pull/13915
I would really appreciate any sort of input on this.
--
Regards,
*Sonam*
Software Engineer
Mobile: +92 3088337296
<http://venturedive.com/>