Looks like the code that converts the parsed ZetaSQL to a Calcite logical expression doesn't currently support aggregate functions with multiple columns. See this TODO: https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java#L147
On Tue, Feb 9, 2021 at 10:22 AM Sonam Ramchand < [email protected]> wrote: > Hi Devs, > I am trying to test the COVAR_POP aggregate function for the ZetaSql > dialect. I see > https://github.com/apache/beam/blob/b74fcf7b30d956fb42830d652a57b265a1546973/sdks/[…]he/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java > <https://github.com/apache/beam/blob/b74fcf7b30d956fb42830d652a57b265a1546973/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java#L50> > is > implemented as CombineFn and it works correctly > https://github.com/apache/beam/blob/befcc3d780d561e81f23512742862a65c0ae3b69/sdks/[…]eam/sdk/extensions/sql/BeamSqlDslAggregationCovarianceTest.java > <https://github.com/apache/beam/blob/befcc3d780d561e81f23512742862a65c0ae3b69/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationCovarianceTest.java#L87> > .However, for ZetaSql dialect, it throws: > covar_pop has more than one argument. > java.lang.IllegalArgumentException: covar_pop has more than one argument.Unit > test: > > public void testZetaSqlCovarPop() { > String sql = "SELECT COVAR_POP(row_id,int64_col) FROM table_all_types > GROUP BY bool_col"; > > ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); > BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); > PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, > beamRelNode); > > final Schema schema = Schema.builder().addDoubleField("field1").build(); > PAssert.that(stream) > .containsInAnyOrder( > Row.withSchema(schema).addValue(-1.00000).build(), > Row.withSchema(schema).addValue(-1.55556).build()); > > > pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); > } > > Can anybody help me in understanding the cause of this problem? I do not > understand how it works correctly in other places and not in ZetaSqlDialect. > For reference: https://github.com/apache/beam/pull/13915 > > I would really appreciate any sort of input on this. > -- > > Regards, > *Sonam* > Software Engineer > Mobile: +92 3088337296 <+92%20308%208337296> > > <http://venturedive.com/> >
