Hi Zhefu,

谢谢您分享解决问题的细节,这对社区有很大的贡献!

1. 关于订阅问题

我想确认一下,你是否参考了[1],同时以订阅中文用户列表(user-zh@flink.apache.org)为例,您需要发送邮件到(
user-zh-subscr...@flink.apache.org),就是在原有邮件的地址上添加subscribe。同时收到一封“confirm
subscribe to *user-zh*@flink.apache.org”的确认邮件,需要进行确认回复。

2. 关于JAR包冲突问题

flink-python
JAR会携带flink-python对beam依赖的核心JAR包,我这里想了解一些,为啥你集群上面存在这beam相关的包?另外我认为您提供的case很好,让我想到了可以对PyFlink对Beam的依赖进行一些优化,比如将beam进行relocation.
我已经创建了社区改进JIRA[2].

3. 关于打包问题

上传给PyFlink的Python环境包需要是在机器间可移植的,所以的确不能包含软链接。如果是用virtualenv创建的环境的话,需要加上--always-copy选项。此外,如果集群机器上已经有准备好的python3.5+的环境,可以不用上传环境包,直接使用add_python_executable("python3")为集群指定要使用的Python
Interpreter。
除了virtualenv,conda/miniconda
也可用于创建虚拟环境,但是大小要大很多,在virtualenv处于某些原因不work的时候(比如源python解释器依赖的so文件在集群上不存在),可以考虑使用。

再次感谢您分享问题的解决细节和问题的反馈!

Best,
Jincheng

[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
[2] https://issues.apache.org/jira/browse/FLINK-16762


Zhefu PENG <pengzf0...@gmail.com> 于2020年3月24日周二 下午9:33写道:

> Hi Jincheng,
>
> 在中文邮件用户列表里我无法回复自己的问题(我已经十分确认subscribe了mailing list, but i dont know why
> and how),所以在这里回复一下。 经过同事的帮忙和共同努力,我们初步解决了之前的疑问。反馈如下:
>
> 1. 首先是包冲突的问题,我们发现flink-python这个包下也有beam-runners-core-java-2.15.0.jar,
> beam-runners-direct-java-2.15.0.jar,  beam-runners-flink_2.11-2.15.0.jar,
> beam-sdk-java-core-2.15.0.jar这四个jar包的代码,若是运行的集群环境下本身也有这四个包的话,则会产生冲突,运行udf功能时找不到依赖。
>
> 2.
> 因为一些原因,集群的python默认环境无法改成python3,因此我们在代码中添加env.add_python_archive的功能,并且使用set_python_executor来帮助指定解释器。
> 用到的压缩包,在打包时一定要去掉软链接的使用,(可能在解压后不支持软链接查找。),我之前打包方式有错,所以出现了问题。
>
> 以上两点是我们排查出的原因,也终于能在cluster
> mode下成功运行udf。希望我们的反馈也能给你们的发展以及对用户的指导增加一点贡献。再次感谢帮助:)
>
> Best,
> Zhefu
>
> Zhefu PENG <pengzf0...@gmail.com> 于2020年3月20日周五 下午11:19写道:
>
>>
>> 感谢回复!beam就是直接用pip install方法安装的,因为在用pip install
>> apache-flink的时候发现有很多依赖,而且在安装时候要求安装beam的2.15.0的版本,我就安上了2.15.0。flink版本是1.10,没有从源码编译。因此我们也很困扰现在,希望能够求得帮助。
>>
>> 还有个小问题,就是我看到似乎你没有把日志一起转到邮件列表中,我想附带上去,那么我在订阅后,直接回复可以把日志贴上去吗?
>>
>> 顺祝,周末愉快:)非常感谢回复!
>>
>> 彭哲夫
>>
>> On Fri, Mar 20, 2020 at 22:34 jincheng sun <sunjincheng...@gmail.com>
>> wrote:
>>
>>> 彭哲夫,你好:
>>>
>>> 你是如何安装的beam,安装的版本是多少?如果是1.10 需要apache-beam 2.15,如果是 master需要apache-beam
>>> 2.19.
>>>
>>> BTW,  为了共享你的问题,我将你的问题发到了中文用户列表里面,我们大家一起讨论。
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>> Zhefu PENG <pengzf0...@gmail.com> 于2020年3月20日周五 下午5:12写道:
>>>
>>>> Hi Jincheng,
>>>>
>>>> 针对昨天提到的第二点,我做了两个思路:1. 将我的pyflink.py以本地模式运行,2.对集群节点进行环境配置
>>>>
>>>> 1.
>>>> 在本地模式中,我们发现在装apache-beam包的时候出于某些原因没有装全,少了_bz2模块,再补充上之后,pyflink.py的脚本可以正常运行,其中的udf功能也能正常使用。
>>>>
>>>> 2.
>>>> 在本地模式运行成功的基础上,我们根据你的建议,对所有的worker节点进行了环境的更新,都更新到了python3.6以及安装了apache_beam和apache-flink.
>>>> 但是以集群模式运行带有udf功能的脚本仍然报错,尝试谷歌搜索以后也没有搜到相关解答,在附件附上错误日志,希望能得到帮助(因为本地模式已经成功所以就不附带代码了),非常感谢!
>>>>
>>>> 期待您的回复
>>>> 彭哲夫
>>>>
>>>>
>>>> Zhefu PENG <pengzf0...@gmail.com> 于2020年3月19日周四 下午11:14写道:
>>>>
>>>>> Hi Jincheng:
>>>>>
>>>>> 非常感谢你如此迅速而细致的回复!~
>>>>>
>>>>> 关于第一点:根据你的回复,我在flink的lib目录下增加flink-csv-1.10.0-sql-jar.jar包之后,运行成功。而第一个包我在之前浏览你博客中关于kafka的使用的demo(based
>>>>> on flink 1.9)中有看到并下载,因此这里有个提议,或许你未来可以对于后续使用者补充
>>>>> flink-csv-1.10.0-sql-jar.jar包的使用的必要性 :),但也有可能是我在查询学习时看漏了,但不管怎么说感谢你的帮助解决;
>>>>>
>>>>> 关于第二点:因为部门相关组织安排问题,我现在没有权限去worker节点上查询,但是针对这一点我有个好奇的地方:我目前只在启动脚本的主机上安装了python3.5+,
>>>>> 并且除了udf功能外,我都能正常使用(比如sql本身就有的concat之类,或者add_columns()这种简单功能)。所以是不是我理解为,如果要使用pyflink的全部功能,应该是集群的环境都要是python3.5+?
>>>>> 但是简单的功能,只要启动脚本的主机环境符合就够了?
>>>>> 还是关于第二点,我刚刚又重新跑了一下脚本,本来是希望能获得和之前一样的错误日志发给我的mentor,但是发现这次报了新的问题:
>>>>> java.lang.NoClassDefFoundError: Could not initialize class
>>>>> org.apache.beam.sdk.options.PipelineOptionsFactory
>>>>>     at
>>>>> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:173)
>>>>>     at
>>>>> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193)
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139)
>>>>>     at
>>>>> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143)
>>>>>     at
>>>>> org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:73)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>>>>
>>>>>
>>>>> 我猜测原因正如你提到的worker的环境不符标准,我会在明天上班后请同事帮忙check后,根据你的建议进行修改尝试。也希望能解答一下疑问,因为刚毕业参加工作,可能提的问题会显得比较低级,请见谅!
>>>>>
>>>>> 再次感谢你的回复,我会根据建议尽快进行错误修复
>>>>> 彭哲夫
>>>>>
>>>>> jincheng sun <sunjincheng...@gmail.com> 于2020年3月19日周四 下午9:08写道:
>>>>>
>>>>>> 彭哲夫,你好:
>>>>>>
>>>>>> 你上面问题可能原因是:
>>>>>>
>>>>>> 1. pyflink默认不包含kafka
>>>>>> connector的jar包和csv的格式JIR包,需要把这些jar包加到pyflink的lib目录下:
>>>>>>
>>>>>> $ PYFLINK_LIB=`python -c "import pyflink;import
>>>>>> os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/lib')"`
>>>>>> $ cd $PYFLINK_LIB
>>>>>> $ curl -O
>>>>>> https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
>>>>>> $ curl -O
>>>>>> https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar
>>>>>>
>>>>>> 2.
>>>>>> 有可能的原因是worker上没有安装python3以上环境或者环境中没有安装apache-beam,可以尝试在worker机器上执行一下:
>>>>>> python --version 检查python版本,同时执行 pip list 查看是否有apache-beam,如果没有,可以执行
>>>>>> :python -m pip install apache-flink
>>>>>>
>>>>>> 期望对你有帮助,有问题我们持续沟通。
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>>
>>>>>>
>>>>>> Zhefu PENG <pengzf0...@gmail.com> 于2020年3月19日周四 下午8:13写道:
>>>>>>
>>>>>>> 你好:
>>>>>>>
>>>>>>>
>>>>>>> 在网上看到了你的博客,关于你对pyflink的开发和推动深感敬佩。我们部门因为业务需要最近在调研使用flink相关,我写了个一个简单的demo想做体验和测试,但是遇到了两个问题(第二个问题是目前遇到的比较大的困难,第一个问题采取了规避策略:)):
>>>>>>>
>>>>>>> 1.
>>>>>>> 当我把数据想以Csv格式输出到Kafka时,报错。(从社区文档我已经了解到应该用Csv()取代OldCsv(),并修改)。查看报错信息后我怀疑是因为缺少jar包导致(比如之前使用Json格式时候),但是从另一个文档中了解到csv格式应该是built-in的。目前采取了规避措施,采用json格式输出。
>>>>>>>
>>>>>>> 报错信息如下:
>>>>>>>
>>>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>>>>> o62.insertInto.
>>>>>>> : org.apache.flink.table.api.NoMatchingTableFactoryException: Could
>>>>>>> not find a suitable table factory for
>>>>>>> 'org.apache.flink.table.factories.SerializationSchemaFactory' in
>>>>>>> the classpath.
>>>>>>>
>>>>>>> Reason: No factory supports all properties.
>>>>>>>
>>>>>>> The matching candidates:
>>>>>>> org.apache.flink.formats.csv.CsvRowFormatFactory
>>>>>>> Unsupported property keys:
>>>>>>> format.fields.#.name
>>>>>>> format.fields.#.data-type
>>>>>>>
>>>>>>> table注册部分代码如下:
>>>>>>> table_env.connect(Kafka()
>>>>>>>                   .version("universal")
>>>>>>>                   .topic(kafka_write_topic)
>>>>>>>                   .property(kafka_server,
>>>>>>> ','.join(kafka_server_list))
>>>>>>>                   .property(kafka_zookeeper,
>>>>>>> ','.join(kafka_server_list))) \
>>>>>>>     .with_format(Csv()
>>>>>>>                  .schema(DataTypes.ROW([DataTypes.FIELD("a",
>>>>>>> DataTypes.STRING()),
>>>>>>>                                         DataTypes.FIELD("b",
>>>>>>> DataTypes.STRING()),
>>>>>>>                                         DataTypes.FIELD("c",
>>>>>>> DataTypes.STRING())
>>>>>>>                                         ]))) \
>>>>>>>     .with_schema(Schema()
>>>>>>>                  .field("a", DataTypes.STRING())
>>>>>>>                  .field("b", DataTypes.STRING())
>>>>>>>                  .field("c", DataTypes.STRING())) \
>>>>>>>     .create_temporary_table(table_name_output)
>>>>>>>
>>>>>>> 2.采用规避策略后,尝试使用python udf增加自定义的函数丰富功能。但是在按照给的demo中的步骤定义好udf函数后,
>>>>>>> 才运行起来后,一段时间内会超时报错,猜测是因为pyflink-udf-runner.sh这个脚本没有被启用,
>>>>>>> 但是在所依赖的opt/flink-python_2.11-1.10.0.jar的包内可以找到该脚本。
>>>>>>> 报错信息如下:
>>>>>>> 2020-03-19 17:02:23,273 INFO
>>>>>>>  
>>>>>>> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory
>>>>>>>  - Still waiting for startup of environment
>>>>>>> '/matrix/data/hadoop/data4/mapred/usercache/root/appcache/application_1584436506937_1231/python-dist-1e474e56-46a7-4ae6-bc4a-871ba86917a6/pyflink-udf-runner.sh'
>>>>>>> for worker id 1
>>>>>>>
>>>>>>> 这部分代码如下:
>>>>>>> @udf(input_types=[DataTypes.STRING()],
>>>>>>>      result_type=DataTypes.STRING())
>>>>>>> def if_length_enough(devid):
>>>>>>>     res = devid + " func"
>>>>>>>     return res
>>>>>>>
>>>>>>> table_env.register_function("if_length_enough", if_length_enough)
>>>>>>>
>>>>>>> table_env.from_path(table_name_input) \
>>>>>>>     .select("a, b, if_length_enough(c)") \
>>>>>>>     .insert_into(table_name_output)
>>>>>>>
>>>>>>>
>>>>>>> 以上两个错误,困扰了一下午,希望能帮忙解答,非常感谢!
>>>>>>> 期待您的回复。
>>>>>>>
>>>>>>> 彭哲夫
>>>>>>>
>>>>>>>
Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.6/runpy.py", line 193, in 
_run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/python3/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/tianyongxiao/flink_udp_test.py", line 89, in <module>
    t_env.execute('flink_kafka_test_tyx')
  File 
"/tmp/pyflink/7aa3497f-f4e4-4906-8af0-242dee6fc77d/b864f67a-5c25-4f1d-89c6-03856f0c725dpyflink.zip/pyflink/table/table_environment.py",
 line 907, in execute
  File 
"/tmp/pyflink/7aa3497f-f4e4-4906-8af0-242dee6fc77d/b864f67a-5c25-4f1d-89c6-03856f0c725dpy4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
  File 
"/tmp/pyflink/7aa3497f-f4e4-4906-8af0-242dee6fc77d/b864f67a-5c25-4f1d-89c6-03856f0c725dpyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
  File 
"/tmp/pyflink/7aa3497f-f4e4-4906-8af0-242dee6fc77d/b864f67a-5c25-4f1d-89c6-03856f0c725dpy4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o3.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
57c29a301ce9aed3f07e631073be1e78)
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
        at 
org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 57c29a301ce9aed3f07e631073be1e78)
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
        ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.ServiceConfigurationError: 
org.apache.beam.sdk.options.PipelineOptionsRegistrar: Provider 
org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype
        at java.util.ServiceLoader.fail(ServiceLoader.java:239)
        at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
        at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:415)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet$Builder.addAll(ImmutableSet.java:507)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedSet$Builder.addAll(ImmutableSortedSet.java:528)
        at 
org.apache.beam.sdk.util.common.ReflectHelpers.loadServicesOrdered(ReflectHelpers.java:227)
        at 
org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.initializeRegistry(PipelineOptionsFactory.java:1823)
        at 
org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.<init>(PipelineOptionsFactory.java:1817)
        at 
org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.<init>(PipelineOptionsFactory.java:1786)
        at 
org.apache.beam.sdk.options.PipelineOptionsFactory.resetCache(PipelineOptionsFactory.java:542)
        at 
org.apache.beam.sdk.options.PipelineOptionsFactory.<clinit>(PipelineOptionsFactory.java:508)
        at 
org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:173)
        at 
org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193)
        at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139)
        at 
org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143)
        at 
org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:73)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:87)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

回复