设置-pyclientexec参数和sql client终端SET 'python.client.executable'='xxxx’; 都能解决` 
java.lang.IllegalStateException: Instantiating python function xx 
failed`的问题,但我接下来正常执行udf函数的时候依然报错,一直没有找到` Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory `的原因是啥。
```
select func1('Chicago');
# console log
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory

#client log
org.apache.flink.table.client.gateway.SqlExecutionException: Error while 
retrieving result.
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.lang.RuntimeException: Failed to fetch next result
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.io.IOException: Failed to fetch job execution result
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
b3dcd20267356a2280d09df4bcd6f440)
        at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: b3dcd20267356a2280d09df4bcd6f440)
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(Unknown 
Source) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]
        at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(Unknown 
Source) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]
        at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at jdk.internal.reflect.GeneratedMethodAccessor88.invoke(Unknown 
Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[?:?]
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[?:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
~[?:?]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) ~[?:?]
        at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source) ~[?:?]
        at java.util.concurrent.ForkJoinPool.scan(Unknown Source) ~[?:?]
        at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) ~[?:?]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:101)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:71)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) 
~[flink-dist-1.16.0.jar:1.16.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) 
~[flink-dist-1.16.0.jar:1.16.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
~[flink-dist-1.16.0.jar:1.16.0]
        at java.lang.Thread.run(Unknown Source) ~[?:?]
```

-----------------------------------

On 2022/06/09 01:54:04 RS wrote:
> Hi,
> 是这个问题了,成功了,不清楚为什么要把UDF的解释器分开配置
>
>
>
> Thx
>
>
> 在 2022-06-08 13:29:48,"Dian Fu" <di...@gmail.com> 写道:
> >有两个参数指定Python解释器:
> >1)-pyexec,指定的是作业执行过程中,用来运行Python UDF的Python解释器路径
> >2)-pyclientexec,指定客户端编译作业的时候,用到的Python解释器路径,具体信息可以看一下:
> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-interpreter-of-client
> >
> >可以把这个参数-pyclientexec 也加上试试。
> >
> >On Tue, Jun 7, 2022 at 11:24 AM RS <ti...@163.com> wrote:
> >
> >> Hi,
> >>
> >>
> >> 环境:
> >> - flink-1.14.3, 单机集群
> >> - 服务器上默认python2,也存在python3.6.8
> >> - /xxx/bin/python3是python3生成的虚拟环境
> >>
> >>
> >> 使用sql-client测试pyflink的udf,自定义了一个函数f1,/xxx/p.py
> >> 启动命令:
> >> ./bin/sql-client.sh -pyfs file:///xxx/p.py -pyexec /xxx/bin/python3
> >> 配置pyexec指定了使用的python为python3
> >>
> >>
> >> 执行命令报错,报错信息如下:
> >> Flink SQL> create temporary function fun1 as 'p.f1' language python;
> >> [INFO] Execute statement succeed.
> >> Flink SQL> select fun1('a',1,'s');
> >> Traceback (most recent call last):
> >>   File "/usr/lib64/python2.7/runpy.py", line 151, in _run_module_as_main
> >>     mod_name, loader, code, fname = _get_module_details(mod_name)
> >>   File "/usr/lib64/python2.7/runpy.py", line 101, in _get_module_details
> >>     loader = get_loader(mod_name)
> >>   File "/usr/lib64/python2.7/pkgutil.py", line 464, in get_loader
> >>     return find_loader(fullname)
> >>   File "/usr/lib64/python2.7/pkgutil.py", line 474, in find_loader
> >>     for importer in iter_importers(fullname):
> >>   File "/usr/lib64/python2.7/pkgutil.py", line 430, in iter_importers
> >>     __import__(pkg)
> >>   File "/home/flink-1.14.3/opt/python/pyflink.zip/pyflink/__init__.py",
> >> line 26, in <module>
> >> RuntimeError: Python versions prior to 3.6 are not supported for PyFlink
> >> [sys.version_info(major=2, minor=7, micro=5, releaselevel='final',
> >> serial=0)].
> >> [ERROR] Could not execute SQL statement. Reason:
> >> java.lang.IllegalStateException: Instantiating python function 'p.f1'
> >> failed.
> >>
> >>
> >> 报错提示中使用到的是python2,不是参数里面配置的python3,如何让pyexec生效?
> >>
> >>
> >> Thx
>

Reply via email to