Re: pyflink 定义udf 运行报错
Hi, 因为你没有提供详细的作业信息,单看报错可以看到是使用的Python UDF抛出来的,更具体点是你Python UDF返回的字符串结果在java端反序列的时候失败了,你可以检查一下你对应的Python UDF Best, Xingbo Leopard 于2020年12月16日周三 上午9:42写道: > pyflink 1.11.1 > > Fail to run sql command: SELECT > driverStatus,userId,latitude,locTime,longitude,city_code,ad_code > ,geo_to_h3(latitude,longitude,7) as > h3_hash,geo_to_numpy_int_h3(latitude,longitude,7) as h3_code > FROM lbs_trace CROSS JOIN UNNEST(datas),lateral > table(split_json(expandInfo)) as T(city_code,ad_code) > java.io.IOException: Fail to run stream sql job > at > > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172) > at > > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105) > at > > org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) > at > > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494) > at > > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257) > at > > org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151) > at > > org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111) > at > > org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) > at > > org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) > at > > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:846) > at > > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:738) > at org.apache.zeppelin.scheduler.Job.run(Job.java:172) > at > > org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) > at > > org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: aa71b252e058bf6b0f5ec15b23d86adc) > at > > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719) > at > > org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) > at > > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161) > ... 16 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: aa71b252e058bf6b0f5ec15b23d86adc) > at > > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) > at > > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) > at > > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) > at > > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) > at > > java.util.concurrent.CompletableFuture$Completion.run(Completab
pyflink 定义udf 运行报错
pyflink 1.11.1 Fail to run sql command: SELECT driverStatus,userId,latitude,locTime,longitude,city_code,ad_code ,geo_to_h3(latitude,longitude,7) as h3_hash,geo_to_numpy_int_h3(latitude,longitude,7) as h3_code FROM lbs_trace CROSS JOIN UNNEST(datas),lateral table(split_json(expandInfo)) as T(city_code,ad_code) java.io.IOException: Fail to run stream sql job at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172) at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105) at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111) at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:846) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:738) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: aa71b252e058bf6b0f5ec15b23d86adc) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719) at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161) ... 16 more Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: aa71b252e058bf6b0f5ec15b23d86adc) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ... 3 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114) ... 19 more Caused by: org.apache.flink.runtime.Job