SqlValidatorException: No match found for function signature prod()
我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function signature prod(),请求大佬帮忙看看_(:з」∠)_ 以下是代码: - ... stableEnv.createTemporarySystemFunction("prod", ProductAggregateFunction.class); Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as yldrate from queryData group by pf_id"); ... - @FunctionHint( input = @DataTypeHint("Double"), output = @DataTypeHint("Double") ) public class ProductAggregateFunction extends AggregateFunction { @Override public Double getValue(Product acc) { return acc.prod; } @Override public Product createAccumulator() { return new Product(); } public void accumulate(Product acc, Double iValue) { acc.prod *= iValue; } public void retract(Product acc, Double iValue) { acc.prod /= iValue; } public void merge(Product acc, Iterable it) { for (Product p : it) { accumulate(acc, p.prod); } } public void resetAccumulator(Product acc) { acc.prod = 1D; } }
Re:SqlValidatorException: No match found for function signature prod()
捞一下自己,在线等大佬们的回复 _(:з」∠)_ 在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道: 我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function signature prod(),请求大佬帮忙看看_(:з」∠)_ 以下是代码: - ... stableEnv.createTemporarySystemFunction("prod", ProductAggregateFunction.class); Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as yldrate from queryData group by pf_id"); ... - @FunctionHint( input = @DataTypeHint("Double"), output = @DataTypeHint("Double") ) public class ProductAggregateFunction extends AggregateFunction { @Override public Double getValue(Product acc) { return acc.prod; } @Override public Product createAccumulator() { return new Product(); } public void accumulate(Product acc, Double iValue) { acc.prod *= iValue; } public void retract(Product acc, Double iValue) { acc.prod /= iValue; } public void merge(Product acc, Iterable it) { for (Product p : it) { accumulate(acc, p.prod); } } public void resetAccumulator(Product acc) { acc.prod = 1D; } }
flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND '20170307'" # 获取Query结果 query_table = env.sql_query(sql) query_table.to_pandas() 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? 由于python只是封装了一下flink的接口,所以会是GIL的影响么? 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi, Xingbo 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 在 2021-03-01 09:54:49,"Xingbo Huang" 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >>
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~! 在 2021-03-01 09:54:49,"Xingbo Huang" 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >>
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi, Xingbo 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~! 在 2021-03-01 09:54:49,"Xingbo Huang" 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >>
Re: pyflink使用的一些疑问
您好, 目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。 pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet; 不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率; 目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多; 个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。 以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~ 还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好~ xiao...@ysstech.com 发件人: qian he 发送时间: 2021-03-14 18:59 收件人: user-zh-flink 主题: pyflink使用的一些疑问 你好, 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map reduce函数,所以有以下疑问: 1.Python flink的SDK还没支持dataset吗? 2.是不是有其他替代方法? 3.如果还没支持,有计划支持的时间吗? 4.flink table为啥不支持map reduce操作? 5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map reduce操作,对应pandas项目改造成flink,有什么好的建议么? 6. datastream api为什么没有实现Windows方法?后面版本会支持吗? 非常感谢,十分看好flink,希望社区越做越大,辛苦了!
Re:Re: pyflink使用的一些疑问
Hi, Xingbo 想跟您了解一下关于sql_query执行上的细节,flink1.12版本底层执行sql语句的过程中,是否有谓词下退的优化? 从相关的代码测试结果看: 1. pyflink1.11版本的connector定义支持参数read.query来获取数据,执行效率很高,猜测这部分执行交由数据库完成; 2. pyflink1.12版本取消了read.query参数,当定义多个数据源执行join等操作时,耗时很明显(pyflink) 所以,基于上述这种情况,想跟您请教一下这部分耗时,也是因为python的语言缺陷,或者ipc开销?还是底层的实现设计导致的呢? 感谢~ 在 2021-03-16 14:27:22,"Xingbo Huang" 写道: >Hi, > >补充回答两点 >1. 现在Table上是支持sliding window和Tumpling Window的Pandas UDAF[1]的, >在1.13会支持session >window的UDAF的支持。对于datastream上window的支持,对于上述几种window,你可以转到table上去操作,对于自定义window,datastream会在1.13支持。 > >2. 关于性能问题,如果你不使用Python >UDFs的话,本质就是跑的Java的代码,python起的作用只是在客户端编译JobGraph的作用,所以不存在说Python >sql_update的运行性能比Java的慢,因为实际运行的代码是一模一样的。对于你使用了Python UDF的话,由于相比Java UDF, >多了IPC的通信开销,以及Python本身的性能就不如Java >Code,目前性能差别大概在6到7倍,我们也一直在性能上做努力,未来希望做到的是完全赶上Java code,甚至C code的性能。 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions > >Best, >Xingbo > >xiaoyue 于2021年3月16日周二 上午11:42写道: > >> 您好, >> 目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。 >> pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet; >> 不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率; >> 目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多; >> 个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。 >> 以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~ >> 还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好~ >> >> >> >> >> xiao...@ysstech.com >> >> 发件人: qian he >> 发送时间: 2021-03-14 18:59 >> 收件人: user-zh-flink >> 主题: pyflink使用的一些疑问 >> 你好, >> >> >> 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map >> reduce函数,所以有以下疑问: >> 1.Python flink的SDK还没支持dataset吗? >> 2.是不是有其他替代方法? >> 3.如果还没支持,有计划支持的时间吗? >> 4.flink table为啥不支持map reduce操作? >> 5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map >> reduce操作,对应pandas项目改造成flink,有什么好的建议么? >> 6. datastream api为什么没有实现Windows方法?后面版本会支持吗? >> >> 非常感谢,十分看好flink,希望社区越做越大,辛苦了! >>
flink1.12 Standalone模式发送python脚本任务报错: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
flink1.12.2 部署standalone集群模式,任务是pyflink实现链接Mysql数据库完成计算任务: 1. 已在 /user/local/flink-1.12.2/lib目录下,添加相关依赖: mysql-connector-java-8.0.12.jar, flink-connector-jdbc_2.11-1.12.0.jar, flink-table-api-java-1.12.0.jar 2.发送任务命令: bin/flink run -py ../test.py -p 8 3.附报错信息如下;在线等路过部署过的大佬,指点一下~ 谢谢! Traceback (most recent call last): File "../test.py", line 57, in env.execute('Test') File "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1276, in execute File "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute. : org.apache.flink.util.FlinkException: Failed to execute job 'Pyflink1.12_Query_Time_Test'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1277) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: TableSourceScan(table=[[default_catalog, default_database, TP_GL_DAY, project=[DAY_ID]]], fields=[DAY_ID])': Loading the input/output formats failed: at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:239) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133) at org.apache.flink.runtime
Re:Re: flink1.12 Standalone模式发送python脚本任务报错: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
好的,问题已经解决~ 谢谢您! 在 2021-03-22 16:50:03,"Dian Fu" 写道: >可以看一下: >https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program > >flink-connector-jdbc_2.11-1.12.0.jar和mysql-connector-java-8.0.12.jar,需要放到PyFlink可以找到的地方。 > >On Mon, Mar 22, 2021 at 1:43 PM xiaoyue <18242988...@163.com> wrote: > >> flink1.12.2 部署standalone集群模式,任务是pyflink实现链接Mysql数据库完成计算任务: >> >> 1. 已在 /user/local/flink-1.12.2/lib目录下,添加相关依赖: >> >> mysql-connector-java-8.0.12.jar, >> >> flink-connector-jdbc_2.11-1.12.0.jar, >> >> flink-table-api-java-1.12.0.jar >> >> 2.发送任务命令: >> >>bin/flink run -py ../test.py -p 8 >> >> 3.附报错信息如下;在线等路过部署过的大佬,指点一下~ 谢谢! >> >> Traceback (most recent call last): >> >> File "../test.py", line 57, in >> >> env.execute('Test') >> >> File >> "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", >> line 1276, in execute >> >> File >> "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", >> line 1286, in __call__ >> >> File >> "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", >> line 147, in deco >> >> File >> "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", >> line 328, in get_return_value >> >> py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute. >> >> : org.apache.flink.util.FlinkException: Failed to execute job >> 'Pyflink1.12_Query_Time_Test'. >> >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918) >> >> at >> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) >> >> at >> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) >> >> at >> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1277) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.lang.RuntimeException: >> org.apache.flink.runtime.client.JobInitializationException: Could not >> instantiate JobManager. >> >> at >> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) >> >> at >> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) >> >> at >> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >> >> at >> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) >> >> at >> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) >> >> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >> >> at >> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >> >> at >> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:16
pyflink1.12 报错:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
在执行 pyflink UDAF 脚本时报错:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0。 目前udaf计算的结果,无法sink, 不知路过的大佬,是否也遇到过这个问题? 异常信息如下: Traceback (most recent call last): File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", line 114, in csv_source_udaf(csv_source) File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", line 45, in wrapper func(*args, **kw) File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", line 103, in csv_source_udaf print(result.to_pandas()) File "C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\table\table.py", line 808, in to_pandas if batches.hasNext(): File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o101.hasNext. : java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77) at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355) at org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644) at org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103) ... 16 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172) ... 18 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) ... 19 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.
flink1.12.0 python udf任务,集群可正常执行,本地执行报错:java.lang.RuntimeException: Failed to create stage bundle factory!
使用python flink1.12 写了UDAF的处理函数,local执行的时候会报错: 已确定当前py3环境下安装了apache-flink1.12.0 希望路过的大佬,能帮忙分析一下~ 感谢! Traceback (most recent call last): File "C:/projects/virtual_pyflink1.12/TestScript/udaf_timeWeightedReturn.py", line 199, in udaf_p_case env.execute('UDAF_timeWeightReturn_p') File "C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\table\table_environment.py", line 1276, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610) at o
flink1.14 注册mysql connector报错
flink1.14 注册mysql下车Connector报错,检查多次未发现语法错误,求助! 代码: env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build(); tEnv = StreamTableEnvironment.create(env, Settings); String mysqlSink = "create table bulk_index_sink(" + " biz_date string, " + " dmo_index_code string, " + " index_value string, " + " primary key(dmo_index_code) not enforced) " + " with (" + " 'connector' = 'jdbc', " + " 'username' = 'root', " + " 'password' = 'yss300377@ZT', " + " 'driver' = 'com.mysql.cj.jdbc.Driver', " + " 'url' = 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " + " 'table-name' = 'bulk_index_sink')"; tEnv.executeSql(mysqlSink); tEnv.execute("mysql_sink_test"); 报错: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "not" at line 1, column 126. Was expecting one of: "DISABLE" ... "ENABLE" ... "NORELY" ... "NOVALIDATE" ... "RELY" ... "VALIDATE" ... ")" ... "," ... at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98) at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) at com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "not" at line 1, column 126. xiao...@ysstech.com
Re: Re: flink1.14 注册mysql connector报错
Hi tony, 完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。 代码: # 执行环境 env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build(); tEnv = StreamTableEnvironment.create(env, Settings); # hive源 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); String confSite = "src\\main\\resources"; String version = "3.1.2"; String defaultDatabase = "fund_analysis"; HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, confSite, confSite, version); tEnv.registerCatalog("hive", hiveCat); tEnv.useCatalog("hive"); # hive 取数SQL String biz_date = "20211130"; String tblSource = String.format("select " + "coalesce(a.rate,0) as yldrate, " + "coalesce(c.rate,0) as riskless_yldrate, " + "a.ccy_type, " + "a.biz_date, " + "b.is_exch_dt, " + "a.pf_id " + "from " + "ts_pf_yldrate a " + "inner join td_gl_day b on b.dt = a.biz_date " + "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date and c.pf_id = a.pf_id " + "where a.biz_date <= '%s'", biz_date); Table table = tEnv.sqlQuery(tblSource); // 注册flatmap函数 tEnv.createTemporarySystemFunction("RowFlatMap", SharpeRatioFlatMap.class); // 注册聚合函数 tEnv.createTemporarySystemFunction("SharpeRatioAgg", SharpeRatioAggregate.class); // 执行flatmap操作 Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"), $("riskless_yldrate"),$("ccy_type"),$("biz_date"), $("is_exch_dt"),$("pf_id"), biz_date)); // 切换catalog,并注册表 tEnv.useCatalog("default_catalog"); tEnv.createTemporaryView("tagTable",tagTbl); // 调用函数SharpeRatioAgg 计算结果 Table result = tEnv.sqlQuery(String.format("select '%s' as biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, dmo_index_code) as index_value from tagTable group by dmo_index_code", biz_date)); // result.execute().print(); (--> 该步 result 可成功打印) // 下沉操作 String mysqlSink = "create table bulk_index_sink(" + " biz_date string, " + " dmo_index_code string, " + " index_value string" + ") with (" + " 'connector' = 'jdbc', " + " 'username' = 'root', " + " 'password' = 'xxx', " + " 'driver' = 'com.mysql.cj.jdbc.Driver', " + " 'url' = 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " + " 'table-name' = 'bulk_index_sink')"; tEnv.executeSql(mysqlSink); result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink"); tEnv.execute("mysql_sink_test"); xiao...@ysstech.com 发件人: Tony Wei 发送时间: 2022-02-25 14:13 收件人: user-zh 主题: Re: flink1.14 注册mysql connector报错 Hi xiaoyue, 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件? 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。 public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, Settings); String mysqlSink = "create table bulk_index_sink(" + " biz_date string, " + " dmo_index_code string, " + " index_value string, " + " primary key(dmo_index_code) not enforced) " + " with (" + " 'connector' = 'jdbc', " + " 'username' = 'root', " + " 'password' = 'yss300377@ZT', " + " 'driver' = 'com.mysql.cj.jdbc.Driver', " + " 'url' = 'jdbc:mysql://192.168.100.104
Re: Re: flink1.14 注册mysql connector报错
好的,成功入库,非常感谢您! xiao...@ysstech.com 发件人: Tony Wei 发送时间: 2022-02-25 14:57 收件人: user-zh 主题: Re: Re: flink1.14 注册mysql connector报错 Hi xiaoyue, 看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); 你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。 best regards, xiaoyue 於 2022年2月25日 週五 下午2:36寫道: > Hi tony, >完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf > function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。 > > 代码: > # 执行环境 > env = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings Settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > tEnv = StreamTableEnvironment.create(env, Settings); > > # hive源 > tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > String confSite = "src\\main\\resources"; > > String version = "3.1.2"; > > String defaultDatabase = "fund_analysis"; > > HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, > confSite, confSite, version); > > tEnv.registerCatalog("hive", hiveCat); > > tEnv.useCatalog("hive"); > # hive 取数SQL > String biz_date = "20211130"; > String tblSource = String.format("select " + > "coalesce(a.rate,0) as yldrate, " + > "coalesce(c.rate,0) as riskless_yldrate, " + > "a.ccy_type, " + > "a.biz_date, " + > "b.is_exch_dt, " + > "a.pf_id " + > "from " + > "ts_pf_yldrate a " + > "inner join td_gl_day b on b.dt = a.biz_date " + > "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date > and c.pf_id = a.pf_id " + > "where a.biz_date <= '%s'", biz_date); > Table table = tEnv.sqlQuery(tblSource); > > // 注册flatmap函数 > tEnv.createTemporarySystemFunction("RowFlatMap", > SharpeRatioFlatMap.class); > // 注册聚合函数 > tEnv.createTemporarySystemFunction("SharpeRatioAgg", > SharpeRatioAggregate.class); > > // 执行flatmap操作 > Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"), > $("riskless_yldrate"),$("ccy_type"),$("biz_date"), > $("is_exch_dt"),$("pf_id"), biz_date)); > > // 切换catalog,并注册表 > tEnv.useCatalog("default_catalog"); > tEnv.createTemporaryView("tagTable",tagTbl); > > // 调用函数SharpeRatioAgg 计算结果 > Table result = tEnv.sqlQuery(String.format("select '%s' as > biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, > dmo_index_code) as index_value from tagTable group by dmo_index_code", > biz_date)); > // result.execute().print(); (--> 该步 result 可成功打印) > > // 下沉操作 > String mysqlSink = "create table bulk_index_sink(" + > " biz_date string, " + > " dmo_index_code string, " + > " index_value string" + > ") with (" + > " 'connector' = 'jdbc', " + > " 'username' = 'root', " + > " 'password' = 'xxx', " + > " 'driver' = 'com.mysql.cj.jdbc.Driver', " + > " 'url' = > 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " + > " 'table-name' = 'bulk_index_sink')"; > tEnv.executeSql(mysqlSink); > > > result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink"); > tEnv.execute("mysql_sink_test"); > > > xiao...@ysstech.com > > 发件人: Tony Wei > 发送时间: 2022-02-25 14:13 > 收件人: user-zh > 主题: Re: flink1.14 注册mysql connector报错 > Hi xiaoyue, > > 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件? > 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。 > > public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings Settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(env, Settings); > > String mys