回复: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()
您好,能否把解决的思路介绍一下? 祝好 在2020年12月18日 10:38,丁浩浩<18579099...@163.com> 写道: 问题我自己已经解决。 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道: flink版本:1.11.1 udaf函数代码来自于阿里云官网文档 以下是代码 public class TestSql { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env); //env.setParallelism(3); tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class); Properties configs = CommonUtils.getConfigs(); //注册clazz源表 FlinkUtils.registerMysqlTable2FlinkTable( tableEnv,configs.getProperty("url"), configs.getProperty("user.name"), configs.getProperty("password"), “test", "clazz_lesson"); Table table = tableEnv.sqlQuery("select count_uadf(clazz_number),clazz_number from clazz_lesson group by clazz_number"); //Table table = tableEnv.sqlQuery("select number,collect(extension_value) from clazz_extension group by number "); tableEnv.toRetractStream(table, Row.class).print(); env.execute(); } } public class CountUdaf extends AggregateFunction { //定义存放count UDAF状态的accumulator的数据的结构。 public static class CountAccum { public long total; } @Override //初始化count UDAF的accumulator。 public CountAccum createAccumulator() { CountAccum acc = new CountAccum(); acc.total = 0; return acc; } @Override //getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。 public Long getValue(CountAccum accumulator) { return accumulator.total; } //accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。 public void accumulate(CountAccum accumulator, Long iValue) { accumulator.total++; } public void merge(CountAccum accumulator, Iterable its) { for (CountAccum other : its) { accumulator.total += other.total; } } } 以下是堆栈信息 - Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 31: No match found for function signature count_uadf() at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) at com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 31: No match found for function signature count_uadf() at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at
Re: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()
问题我自己已经解决。 > 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道: > > flink版本:1.11.1 > udaf函数代码来自于阿里云官网文档 > > 以下是代码 > public class TestSql { >public static void main(String[] args) throws Exception { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env); >//env.setParallelism(3); >tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class); > >Properties configs = CommonUtils.getConfigs(); >//注册clazz源表 >FlinkUtils.registerMysqlTable2FlinkTable( >tableEnv,configs.getProperty("url"), >configs.getProperty("user.name"), > configs.getProperty("password"), >“test", "clazz_lesson"); > >Table table = tableEnv.sqlQuery("select > count_uadf(clazz_number),clazz_number from clazz_lesson group by > clazz_number"); >//Table table = tableEnv.sqlQuery("select > number,collect(extension_value) from clazz_extension group by number "); >tableEnv.toRetractStream(table, Row.class).print(); >env.execute(); > > >} > } > > > > public class CountUdaf extends AggregateFunction { >//定义存放count UDAF状态的accumulator的数据的结构。 >public static class CountAccum { >public long total; >} > >@Override >//初始化count UDAF的accumulator。 >public CountAccum createAccumulator() { >CountAccum acc = new CountAccum(); >acc.total = 0; >return acc; >} >@Override >//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。 >public Long getValue(CountAccum accumulator) { >return accumulator.total; >} > > >//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。 >public void accumulate(CountAccum accumulator, Long iValue) { >accumulator.total++; >} >public void merge(CountAccum accumulator, Iterable its) { >for (CountAccum other : its) { >accumulator.total += other.total; >} >} > } > > 以下是堆栈信息 > > - > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 8 to line 1, column 31: No match > found for function signature count_uadf() > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) > at > com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 8 to line 1, column 31: No match found for function signature > count_uadf() > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) > at >
Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()
flink版本:1.11.1 udaf函数代码来自于阿里云官网文档 以下是代码 public class TestSql { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env); //env.setParallelism(3); tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class); Properties configs = CommonUtils.getConfigs(); //注册clazz源表 FlinkUtils.registerMysqlTable2FlinkTable( tableEnv,configs.getProperty("url"), configs.getProperty("user.name"), configs.getProperty("password"), “test", "clazz_lesson"); Table table = tableEnv.sqlQuery("select count_uadf(clazz_number),clazz_number from clazz_lesson group by clazz_number"); //Table table = tableEnv.sqlQuery("select number,collect(extension_value) from clazz_extension group by number "); tableEnv.toRetractStream(table, Row.class).print(); env.execute(); } } public class CountUdaf extends AggregateFunction { //定义存放count UDAF状态的accumulator的数据的结构。 public static class CountAccum { public long total; } @Override //初始化count UDAF的accumulator。 public CountAccum createAccumulator() { CountAccum acc = new CountAccum(); acc.total = 0; return acc; } @Override //getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。 public Long getValue(CountAccum accumulator) { return accumulator.total; } //accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。 public void accumulate(CountAccum accumulator, Long iValue) { accumulator.total++; } public void merge(CountAccum accumulator, Iterable its) { for (CountAccum other : its) { accumulator.total += other.total; } } } 以下是堆栈信息 - Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 31: No match found for function signature count_uadf() at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) at com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 31: No match found for function signature count_uadf() at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481) at