Hi Junrui,
Thank you for the pointer. I had read that page, and I can use the function
with the Java Table API ok, but I'm trying to use the Top2 accumulator with
a SQL function. I can't use a left lateral join on it since the planner
fails with "not a table function". I don't think a join is the right thing
anyway, since it's an aggregation table function.

    tEnv.createTemporaryFunction("TOP2", Top2.class);
>
>     var calculated2 = tEnv.sqlQuery(
>         "SELECT " +
>             "  TUMBLE_START(ts, INTERVAL '1' SECOND) as w_start, " +
>             "  TUMBLE_END(ts, INTERVAL '1' SECOND) as w_end, " +
>             "  TUMBLE_ROWTIME(ts, INTERVAL '1' SECOND) as w_rowtime, " +
>             "  id, " +
>             "  top1, " +
>             "  top2 " +
>             "FROM " +
>             "  source " +
>             "  LEFT JOIN LATERAL TABLE(TOP2(val)) ON TRUE " +
>             "GROUP BY " +
>             "  TUMBLE(ts, INTERVAL '1' SECOND), " +
>             "  id"
>     ).printExplain();
>

Gives the following:

   org.apache.flink.table.api.ValidationException: SQL validation failed.
> Function 'default_catalog.default_database.TOP2' cannot be used as a table
> function.
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>         at
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>         at
> app//org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
>         at
> app//io.grepr.query.MetricsTableApiTest.test(MetricsTableApiTest.java:129)
>         Caused by:
>         org.apache.flink.table.api.ValidationException: Function
> 'default_catalog.default_database.TOP2' cannot be used as a table function.
>             at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.verifyFunctionKind(FunctionCatalogOperatorTable.java:200)
>             at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:133)
>             at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:126)
>             at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>             at java.base@11.0.22
> /java.util.Optional.flatMap(Optional.java:294)
>             at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:100)
>             at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
>             at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1310)
>             at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
>             at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
>             at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
>             at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
>             at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:993)
>             at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)

            at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)


Jad Naous
<https://streaklinks.com/B4XIkd1eHqitzddwoQduqyI4/https%3A%2F%2Fwww.linkedin.com%2Fin%2Fjadnaous%2F>
Grepr, CEO/Founder


ᐧ

On Thu, Mar 7, 2024 at 9:43 AM Junrui Lee <jrlee....@gmail.com> wrote:

> Hi Jad,
>
> You can refer to the CREATE FUNCTION section (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function)
> and the Table Aggregate Functions section (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions)
> for details on creating and using these functions.
>
> Best regards,
> Junrui
>
> Jad Naous <j...@grepr.ai> 于2024年3月7日周四 22:19写道:
>
>> Hi,
>> The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it
>> possible to use them with SQL?
>> Thanks,
>> Jad Naous
>> <https://streaklinks.com/B4WFYnXoaG38kHdC2w4IJpgT/https%3A%2F%2Fwww.linkedin.com%2Fin%2Fjadnaous%2F>
>> Grepr, CEO/Founder
>>
>> ᐧ
>>
>

Reply via email to