回复: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2021-05-07 文章 JackJia
您好,能否把解决的思路介绍一下?


祝好
在2020年12月18日 10:38,丁浩浩<18579099...@163.com> 写道:
问题我自己已经解决。

在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道:

flink版本:1.11.1
udaf函数代码来自于阿里云官网文档

以下是代码
public class TestSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
//env.setParallelism(3);
tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);

Properties configs = CommonUtils.getConfigs();
//注册clazz源表
FlinkUtils.registerMysqlTable2FlinkTable(
tableEnv,configs.getProperty("url"),
configs.getProperty("user.name"), configs.getProperty("password"),
“test", "clazz_lesson");

Table table = tableEnv.sqlQuery("select count_uadf(clazz_number),clazz_number 
from clazz_lesson group by clazz_number");
//Table table = tableEnv.sqlQuery("select number,collect(extension_value) from 
clazz_extension group by number ");
tableEnv.toRetractStream(table, Row.class).print();
env.execute();


}
}



public class CountUdaf extends AggregateFunction {
//定义存放count UDAF状态的accumulator的数据的结构。
public static class CountAccum {
public long total;
}

@Override
//初始化count UDAF的accumulator。
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
@Override
//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}


//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
public void accumulate(CountAccum accumulator, Long iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}

以下是堆栈信息

-
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 1, column 8 to line 1, column 31: No match found 
for function signature count_uadf()
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 8 to line 1, column 31: No match found for function signature 
count_uadf()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 

Re: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2020-12-17 文章 丁浩浩
问题我自己已经解决。

> 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道:
> 
> flink版本:1.11.1
> udaf函数代码来自于阿里云官网文档
> 
> 以下是代码
> public class TestSql {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
>//env.setParallelism(3);
>tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);
> 
>Properties configs = CommonUtils.getConfigs();
>//注册clazz源表
>FlinkUtils.registerMysqlTable2FlinkTable(
>tableEnv,configs.getProperty("url"),
>configs.getProperty("user.name"), 
> configs.getProperty("password"),
>“test", "clazz_lesson");
> 
>Table table = tableEnv.sqlQuery("select 
> count_uadf(clazz_number),clazz_number from clazz_lesson group by 
> clazz_number");
>//Table table = tableEnv.sqlQuery("select 
> number,collect(extension_value) from clazz_extension group by number ");
>tableEnv.toRetractStream(table, Row.class).print();
>env.execute();
> 
> 
>}
> }
> 
> 
> 
> public class CountUdaf extends AggregateFunction {
>//定义存放count UDAF状态的accumulator的数据的结构。
>public static class CountAccum {
>public long total;
>}
> 
>@Override
>//初始化count UDAF的accumulator。
>public CountAccum createAccumulator() {
>CountAccum acc = new CountAccum();
>acc.total = 0;
>return acc;
>}
>@Override
>//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
>public Long getValue(CountAccum accumulator) {
>return accumulator.total;
>}
> 
> 
>//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
>public void accumulate(CountAccum accumulator, Long iValue) {
>accumulator.total++;
>}
>public void merge(CountAccum accumulator, Iterable its) {
>for (CountAccum other : its) {
>accumulator.total += other.total;
>}
>}
> }
> 
> 以下是堆栈信息
> 
> -
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 8 to line 1, column 31: No match 
> found for function signature count_uadf()
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>   at 
> com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 8 to line 1, column 31: No match found for function signature 
> count_uadf()
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
>   at 
> 

Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2020-12-17 文章 丁浩浩
flink版本:1.11.1
udaf函数代码来自于阿里云官网文档

以下是代码
public class TestSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
//env.setParallelism(3);
tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);

Properties configs = CommonUtils.getConfigs();
//注册clazz源表
FlinkUtils.registerMysqlTable2FlinkTable(
tableEnv,configs.getProperty("url"),
configs.getProperty("user.name"), 
configs.getProperty("password"),
“test", "clazz_lesson");

Table table = tableEnv.sqlQuery("select 
count_uadf(clazz_number),clazz_number from clazz_lesson group by clazz_number");
//Table table = tableEnv.sqlQuery("select 
number,collect(extension_value) from clazz_extension group by number ");
tableEnv.toRetractStream(table, Row.class).print();
env.execute();


}
}



public class CountUdaf extends AggregateFunction {
//定义存放count UDAF状态的accumulator的数据的结构。
public static class CountAccum {
public long total;
}

@Override
//初始化count UDAF的accumulator。
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
@Override
//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}


//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
public void accumulate(CountAccum accumulator, Long iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}

以下是堆栈信息

-
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 1, column 8 to line 1, column 31: No match found 
for function signature count_uadf()
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at 
com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 8 to line 1, column 31: No match found for function signature 
count_uadf()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at