Re: Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable
感谢~ 通过多次调试 是打的venv 包有问题, 已经解决了 现在可以在集群上跑了谢谢~ > -- 原始邮件 -- > 发 件 人:"Dian Fu" > 发送时间:2021-04-15 10:32:49 > 收 件 人:user-zh ,magichu...@88.com > 抄 送: > 主 题:Re: Re: pyflink 运行提示:Function class 'class > org.apache.flink.table.functions.python.PythonScalarFunction' is not > serializable > > 你要不先用local的方式跑一下试试?先缩小一下范围。从报错看,编译的时候报错的,应该在你现在提交作业的机器上就可以复现出来。 > > On Thu, Apr 15, 2021 at 10:24 AM magichuang wrote: > > > 您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java > > version "16" 2021-03-16,这个有影响吗? 我是在"1.8.0_202" 上提交的 > > > > 提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 > > -ytm 1024m -p 1 -py traffic.py > > > > > > > > > > > -- 原始邮件 -- > > > 发 件 人:"Dian Fu" > > > 发送时间:2021-04-14 23:11:57 > > > 收 件 人:user-zh > > > 抄 送: > > > 主 题:Re: pyflink 运行提示:Function class 'class > > org.apache.flink.table.functions.python.PythonScalarFunction' is not > > serializable > > > > > > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。 > > > > > > [1] > > > > > https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m > > > > > > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote: > > > > > > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on > > > > yarn,per-job模式 > > > > > > > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf > > 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 > > > > > > > > > > > > > > > > > > > > 主要代码 > > > > > > > > > > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', > > > > '128m') > > > > > > > > t_env.get_config().get_configuration().set_string("pipeline.jars", > > > > > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > > > > > > t_env.get_config().get_configuration().set_string("pipeline.classpaths", > > > > > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > > > > > > > > > > > > > > > > t_env.add_python_archive("venv.zip") > > > > > > > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python") > > > > > > > > > > > > > > > > > > > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], > > > > result_type=DataTypes.INT()) > > > > > > > > def judge_ip(src_ip, dst_ip): > > > > > > > > import IPy > > > > > > > > . > > > > > > > > t_env.register_function('judge_ip', judge_ip) > > > > > > > > > > > > > > > > 下面是主要报错信息 > > > > > > > > Traceback (most recent call last): > > > > > > > > File "traffic-tuple-sf.py", line 59, in > > > > > > > > t_env.register_function('judge_ip', judge_ip) > > > > > > > > File > > > > > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", > > > > line 876, in register_function > > > > > > > > File > > > > > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > > > > line 1286, in __call__ > > > > > > > > File > > > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", > > line > > > > 147, in deco > > > > > > > > File > > > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > > line > > > > 328, in get_return_value > > > > > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > > > o5.registerFunction. > > > > > > > > : org.apache.flink.table.api.ValidationException: Function class 'class > > > > org.apache.flink.table.functions.python.PythonScalarFunction' is not > > > > serializable. Make sure that the class is self-contained (i.e. no > > > > references to outer classes) and all inner fields are serializable as > > well. > > > > > >
Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable
您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java version "16" 2021-03-16,这个有影响吗? 我是在"1.8.0_202" 上提交的 提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 -ytm 1024m -p 1 -py traffic.py > -- 原始邮件 -- > 发 件 人:"Dian Fu" > 发送时间:2021-04-14 23:11:57 > 收 件 人:user-zh > 抄 送: > 主 题:Re: pyflink 运行提示:Function class 'class > org.apache.flink.table.functions.python.PythonScalarFunction' is not > serializable > > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。 > > [1] > https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m > > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote: > > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on > > yarn,per-job模式 > > > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 > > > > > > > > > > 主要代码 > > > > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', > > '128m') > > > > t_env.get_config().get_configuration().set_string("pipeline.jars", > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > t_env.get_config().get_configuration().set_string("pipeline.classpaths", > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > > > > > > t_env.add_python_archive("venv.zip") > > > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python") > > > > > > > > > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], > > result_type=DataTypes.INT()) > > > > def judge_ip(src_ip, dst_ip): > > > > import IPy > > > > . > > > > t_env.register_function('judge_ip', judge_ip) > > > > > > > > 下面是主要报错信息 > > > > Traceback (most recent call last): > > > > File "traffic-tuple-sf.py", line 59, in > > > > t_env.register_function('judge_ip', judge_ip) > > > > File > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", > > line 876, in register_function > > > > File > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > > line 1286, in __call__ > > > > File > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line > > 147, in deco > > > > File > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line > > 328, in get_return_value > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > o5.registerFunction. > > > > : org.apache.flink.table.api.ValidationException: Function class 'class > > org.apache.flink.table.functions.python.PythonScalarFunction' is not > > serializable. Make sure that the class is self-contained (i.e. no > > references to outer classes) and all inner fields are serializable as well. > > > > at > > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349) > > > > at > > org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204) > > > > at > > org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383) > > > > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357) > > > > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method) > > > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) > > > > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > > > > at > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > > > at > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > > > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > > > at > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > > > at >
pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable
flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on yarn,per-job模式 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 主要代码 t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', '128m') t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") t_env.add_python_archive("venv.zip") t_env.get_config().set_python_executable("venv.zip/venv/bin/python") @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.INT()) def judge_ip(src_ip, dst_ip): import IPy . t_env.register_function('judge_ip', judge_ip) 下面是主要报错信息 Traceback (most recent call last): File "traffic-tuple-sf.py", line 59, in t_env.register_function('judge_ip', judge_ip) File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 876, in register_function File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o5.registerFunction. : org.apache.flink.table.api.ValidationException: Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable. Make sure that the class is self-contained (i.e. no references to outer classes) and all inner fields are serializable as well. at org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349) at org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204) at org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @1311d9fb at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177) at java.base/java.lang.reflect.Field.setAccessible(Field.java:171) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) at org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346) ... 14 more 麻烦各位大佬给看看是哪里有问题呀,应该如何修改~ 感谢
Re: Re: flink on yarn启动失败
感谢感谢感谢!!! 原来是这样,以为solt 缩写就是-s了,,,感谢这位朋友的解答,已经可以提交了~ > -- 原始邮件 -- > 发 件 人:"Yang Wang" > 发送时间:2020-12-24 11:01:46 > 收 件 人:user-zh > 抄 送: > 主 题:Re: flink on yarn启动失败 > > 你这个命令写的有点问题,flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py > traffic.py > > 应该是-ys,而不是-s > -s是从savepoints恢复,所以报错里面会有找不到savepoints目录 > > > Best, > Yang > > magichuang 于2020年12月23日周三 下午8:29写道: > > > 机器参数:三台 32C64G centos 7.8,cdh集群在这上面先部署 > > flink版本:1.11.2,在三台集群上搭建的集群 > > > > hadoop集群是用cdh搭建的 > > > > > > 启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py > > traffic.py > > > > 程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka > > > > > > > > > > 这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败 > > > > 测试官方例子 flink run -m yarn-cluster examples/batch/WordCount.jar > > 是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊? > > > > > > > > > > 下面是主要报错信息 > > > > Caused by: java.util.concurrent.CompletionException: > > org.apache.flink.runtime.client.JobExecutionException: Could not > > instantiate JobManager. > > > > at > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > > ~[?:1.8.0_202] > > > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > ... 4 more > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > > not instantiate JobManager. > > > > at > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > > ~[?:1.8.0_202] > > > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > ... 4 more > > > > Caused by: java.io.FileNotFoundException: Cannot find checkpoint or > > savepoint file/directory '2' on file system 'file'. > > > > at > > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > > > > at > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.create
flink on yarn启动失败
机器参数:三台 32C64G centos 7.8,cdh集群在这上面先部署 flink版本:1.11.2,在三台集群上搭建的集群 hadoop集群是用cdh搭建的 启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py traffic.py 程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka 这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败 测试官方例子 flink run -m yarn-cluster examples/batch/WordCount.jar 是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊? 下面是主要报错信息 Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_202] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.12-1.11.2.jar:1.11.2] ... 4 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_202] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.12-1.11.2.jar:1.11.2] ... 4 more Caused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory '2' on file system 'file'. at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_202] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.12-1.11.2.jar:1.11.2] ... 4 more 2020-12-23 20:12:46,459 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:16109 全部日志可以打开下面的链接:https://note.youdao.com/ynoteshare1/index.html?id=25f1af945e277057c2251e8f60d90f8a=note 加载可能慢一些,请稍等一会就出来了~ Best, MagicHuang
Re: Re: flink clickhouse connector
您是用java写的还是pyflink 啊? 我是用pyflink写的程序,所以需要一个jar包,您那里有嘛,我本地是新安装的maven,在打包 但是一直在下载依赖好多。。 > -- 原始邮件 -- > 发 件 人:"guoliubi...@foxmail.com" > 发送时间:2020-12-17 19:36:55 > 收 件 人:user-zh > 抄 送: > 主 题:Re: flink clickhouse connector > > 我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。 > > > > guoliubi...@foxmail.com > > From: magichuang > Date: 2020-12-17 18:41 > To: user-zh > Subject: flink clickhouse connector > hi 想问一下有小伙伴使用flink > 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX > 这个flink-connector,但是运行报错了: > > Caused by: java.io.IOException: unable to establish connection to ClickHouse > > at > com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79) > > at > org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > at > org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.io.IOException: table `default`.`traffic` is not a > Distributed table > > at > com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96) > > at > com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76) > > ... 12 more > > > > > 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11 > > > > > 有小伙伴成功对接的嘛,可否分享一下connector呀 > > > > > > > > > > > > > > > > > > > > Best, > > MagicHuang > > > > > -- Best, MagicHuang
flink clickhouse connector
hi想问一下有小伙伴使用flink 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX 这个flink-connector,但是运行报错了: Caused by: java.io.IOException: unable to establish connection to ClickHouse at com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: table `default`.`traffic` is not a Distributed table at com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96) at com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76) ... 12 more 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11 有小伙伴成功对接的嘛,可否分享一下connector呀 Best, MagicHuang
Re: Re: pyflink 引用第三库的文件出现安装权限的问题
Hi, 在使用pyflink 提交任务时,指定 -pyarch venv.zip -pyexec venv.zip/venv/bin/python,任务里面用到了udf IPy是第三方库,之前直接安装报权限错误,不能安装,现在想用虚拟环境的方法来解决 set_python_requirements 图片地址:https://s3.ax1x.com/2020/12/17/r8J6AI.png 之前是使用默认Python环境,会报安装权限被拒绝的问题,我刚才使用指定Python环境的方式提交了一下,里面也加上了 set_python_requirements,没有再报权限的错误,并被提交到了yarn上有applicationid,这是说明已经在虚拟环境中安装成功了吧? 但是出现了新的错误,Caused by: java.net.ConnectException: Connection refused 错误图片地址:https://s3.ax1x.com/2020/12/17/r8YJKg.png 我的cdh环境是,hadoop也是在这三台机器上部署的 cdh001 cdh002 cdh003 flink 版本 1.11 集群配置: master cdh001:8081cdh002:8081 wokers cdh001 cdh002 cdh003 看报错是在连接 cdh002:31331 的时候出现了问题,当任务被提交到yarn集群时,我在cdh002的机器上查找这个端口,并没有发现有31331存在 netstat -ntlp|grep 31331 是空的 这个是因为什么呀? Best, Magichuang > -- 原始邮件 -- > 发 件 人:"Xingbo Huang" > 发送时间:2020-12-16 12:42:48 > 收 件 人:user-zh > 抄 送: > 主 题:Re: pyflink 引用第三库的文件出现安装权限的问题 > > Hi, > > 默认就是你每台机器的python指向的环境下,当然你也可以通过-pyexec指定不同的python环境 > > Best, > Xingbo > > magichuang 于2020年12月15日周二 下午8:02写道: > > > 我现在看看那个报错,flink是把requirements.txt 和 cached_dir 已经先上传到hdfs上了,因为 > > /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/requirements.txt > > > > > > /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/cached_dir > > 在提交的时候 去看机器上是存在的,只不过等程序挂了,这个 > > /yarn/nm/usercache/root/appcache/application_1608026509770_0001文件夹就没了,所以有感觉hdfs没有问题。。 > > > > 现在想请教一下,flink在引入外部 python依赖时,在从离线包里面安装库的时候是安装到了哪里? > > > > > > > > > > 我看报错信息: Error [Errno 13] Permission denied: '' while executing command > > python setup.py egg_info > > > > 因为它是在 python setup.py 的时候报的权限问题 > > > > > > > > > > 求大家给看看~~ 感谢 > > > > > > > > > > -- 原始邮件 -- > > > > 发 件 人:magichuang > > > > 发送时间:2020-12-15 14:15:04 > > > > 收 件 人:user-zh > > > > 抄 送: > > > > 主 题:pyflink 引用第三库的文件出现安装权限的问题 > > > > > > > > > > 请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错 > > > > flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器 > > > > > > > > > > 提交命令:flink run -m yarn-cluster -ynm demo -ys 2 -ytm 2048 -p 2 -py demo.py > > > > > > > > > > 代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png > > > > > > > > > > 报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png > > > > > > > > > > requestments.txt: IPy==1.0 cache_dir: IPy-1.00.tar.gz > > > > > > > > > > 自定义udf代码: > > > > @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) > > > > def judge_ip(ip): > > > > import IPy > > > > if ip in IPy.IP('192.168.112.0/28'): > > > > return 'in' > > > > return 'out' > > > > > > > > > > > > > > > > 祝好~ > > > > > > -- Best, MagicHuang
Re: Re: 求教:pyflink的sink是否支持redis connector?
hi, 想问一下您这个 https://github.com/apache/bahir-flink/tree/master/flink-connector-redis 可以打包成jar包嘛,然后在pyflink里用 对java不熟悉,我看这个页面里面只是对java和scala说了如何用 Best, MagicHuang > -- 原始邮件 -- > 发 件 人:"Dian Fu" > 发送时间:2020-12-17 10:16:13 > 收 件 人:user-zh ,hepei...@qq.com > 抄 送: > 主 题:Re: 求教:pyflink的sink是否支持redis connector? > > 感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。 > > redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/bahir-flink/tree/master/flink-connector-redis > > 关于如何在PyFlink中使用connector,可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html > > > 在 2020年12月17日,上午9:52,Xingbo Huang 写道: > > > > Hi, > > > > 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis > > connector,关于如何自定义connector,你可以参考文档[2] > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/ > > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html > > > > Best, > > Xingbo > > > > > > 消息室 于2020年12月17日周四 上午9:33写道: > > > >> 您好: > >> > >> > >> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis > >> connector?感谢! > >> 如不支持,有何建议方式? >
Re: pyflink 引用第三库的文件出现安装权限的问题
我现在看看那个报错,flink是把requirements.txt 和 cached_dir 已经先上传到hdfs上了,因为 /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/requirements.txt /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/cached_dir 在提交的时候 去看机器上是存在的,只不过等程序挂了,这个 /yarn/nm/usercache/root/appcache/application_1608026509770_0001文件夹就没了,所以有感觉hdfs没有问题。。 现在想请教一下,flink在引入外部 python依赖时,在从离线包里面安装库的时候是安装到了哪里? 我看报错信息: Error [Errno 13] Permission denied: '' while executing command python setup.py egg_info 因为它是在 python setup.py 的时候报的权限问题 求大家给看看~~感谢 -- 原始邮件 -- 发 件 人:magichuang 发送时间:2020-12-15 14:15:04 收 件 人:user-zh 抄 送: 主 题:pyflink 引用第三库的文件出现安装权限的问题 请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错 flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器 提交命令:flink run -m yarn-cluster -ynm demo -ys 2 -ytm 2048 -p 2 -py demo.py 代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png 报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png requestments.txt:IPy==1.0cache_dir: IPy-1.00.tar.gz 自定义udf代码: @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def judge_ip(ip): import IPy if ip in IPy.IP('192.168.112.0/28'): return 'in' return 'out' 祝好~
pyflink 引用第三库的文件出现安装权限的问题
请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错 flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器 提交命令:flink run -m yarn-cluster -ynm demo -ys 2 -ytm 2048 -p 2 -py demo.py 代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png 报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png requestments.txt:IPy==1.0cache_dir: IPy-1.00.tar.gz 自定义udf代码: @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def judge_ip(ip): import IPy if ip in IPy.IP('192.168.112.0/28'): return 'in' return 'out' 祝好~
pyflink udf 依赖问题
大家好,我刚才修改了集群的flink配置文件 python.client.executable: /usr/bin/python3.6 ,将集群三台机器的Python默认 改为了python3.6 flink版本:1.11flink on yarn集群搭建的,通过per-job模式提交任务的 提交命令:flink run -m yarn-cluster -ytm 2048 -s 2 -p 2 -py traffic.py 截图地址 看报错是目录权限问题,是通过 先离线安装到cached_dir 然后提交的,flink集群搭建时用的是root用户,提交任务时也是root用户 截图地址
pyflink udf 依赖问题
大家好,我刚才修改了集群的flink配置文件 python.client.executable: /usr/bin/python3.6 ,将集群三台机器的Python默认 改为了python3.6 flink版本:1.11flink on yarn集群搭建的,通过per-job模式提交任务的 提交命令:flink run -m yarn-cluster -ytm 2048 -s 2 -p 2 -py traffic.py 提交之后出现了新的问题,看报错是目录权限问题,是通过 先离线安装到cached_dir 然后提交的
pyflink udf依赖引用问题
在使用pyflink udf时,引用第三方依赖时出现了一下问题 如果直接通过 pythonapp.py运行是没有问题的,可以出结果,但是提交到集群上就不行了 flink版本 1.11,是flink onyarn集群部署的,通过 per-job模式提交任务,集群一共三台机器,在其中一台上提交的 下面是代码截图 麻烦给看一下,上面那个报错是因为什么呀?需要在其他两台机器上提前把cache_dir弄好吗? 我看日志是有尝试在hdfs上新创建一个临时文件夹来放置依赖tar.gz文件的 祝好~
Re: Re: Flink SQL 怎么为每一个任务分配不同的内存配置
各位好,最近也在思考这个问题,我是采用的flink on yarn集群部署的,每次通过Per-job模式提交任务时虽然指定了 -ytm 2048,但是当运行起来之后去看yarn资源会发现可用内存并不是减少2g 而是减少了3g,也就是实际占用会比我给定的内存多1g。 如果每个sql任务单独运行的话,这样会不会造成资源浪费呀? 再用sql语句编写程序时,能不能在一个任务中,在用多个source、transformation、sink情况下,为每个sql单独指定solt数? 祝好~ > -- 原始邮件 -- > 发 件 人:"Kyle Zhang" > 发送时间:2020-12-14 09:25:59 > 收 件 人:user-zh@flink.apache.org > 抄 送: > 主 题:Re: Flink SQL 怎么为每一个任务分配不同的内存配置 > > 一个集群跑一个SQL任务怎么样 > > On Mon, Dec 14, 2020 at 8:42 AM yinghua...@163.com > wrote: > > > Flink 作业在提交时可以通过参数指定JobManager > > 和TaskManager的内存配置,但是SQL执行时怎么为每一个任务指定其内存配置,是不是都是读同一个flink-conf.yaml中的配置? > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#memory-configuration > > 中内存的配置都是基于flink-conf.yaml文件来操作的,是全局的配置,没有找到基于SQL任务独立配合内存的? > > > > > > > > yinghua...@163.com > >