Re: Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-15 Thread magichuang
感谢~   通过多次调试   是打的venv 包有问题, 已经解决了  现在可以在集群上跑了谢谢~


> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2021-04-15 10:32:49
> 收 件 人:user-zh ,magichu...@88.com
> 抄 送:
> 主 题:Re: Re: pyflink 运行提示:Function class 'class 
> org.apache.flink.table.functions.python.PythonScalarFunction' is not 
> serializable
>
> 你要不先用local的方式跑一下试试?先缩小一下范围。从报错看,编译的时候报错的,应该在你现在提交作业的机器上就可以复现出来。
>
> On Thu, Apr 15, 2021 at 10:24 AM magichuang wrote:
>
> > 您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java
> > version "16" 2021-03-16,这个有影响吗? 我是在"1.8.0_202" 上提交的
> >
> > 提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1
> > -ytm 1024m -p 1 -py traffic.py
> >
> >
> >
> >
> > > -- 原始邮件 --
> > > 发 件 人:"Dian Fu"
> > > 发送时间:2021-04-14 23:11:57
> > > 收 件 人:user-zh
> > > 抄 送:
> > > 主 题:Re: pyflink 运行提示:Function class 'class
> > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > serializable
> > >
> > > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。
> > >
> > > [1]
> > >
> > https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
> > >
> > > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote:
> > >
> > > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on
> > > > yarn,per-job模式
> > > >
> > > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf
> > 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
> > > >
> > > >
> > > >
> > > >
> > > > 主要代码
> > > >
> > > >
> > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> > > > '128m')
> > > >
> > > > t_env.get_config().get_configuration().set_string("pipeline.jars",
> > > >
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> > > >
> > > >
> > t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> > > >
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> > > >
> > > >
> > > >
> > > >
> > > > t_env.add_python_archive("venv.zip")
> > > >
> > > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
> > > >
> > > >
> > > >
> > > >
> > > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> > > > result_type=DataTypes.INT())
> > > >
> > > > def judge_ip(src_ip, dst_ip):
> > > >
> > > > import IPy
> > > >
> > > > .
> > > >
> > > > t_env.register_function('judge_ip', judge_ip)
> > > >
> > > >
> > > >
> > > > 下面是主要报错信息
> > > >
> > > > Traceback (most recent call last):
> > > >
> > > > File "traffic-tuple-sf.py", line 59, in
> > > >
> > > > t_env.register_function('judge_ip', judge_ip)
> > > >
> > > > File
> > > >
> > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> > > > line 876, in register_function
> > > >
> > > > File
> > > >
> > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> > > > line 1286, in __call__
> > > >
> > > > File
> > > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> > line
> > > > 147, in deco
> > > >
> > > > File
> > > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> > line
> > > > 328, in get_return_value
> > > >
> > > > py4j.protocol.Py4JJavaError: An error occurred while calling
> > > > o5.registerFunction.
> > > >
> > > > : org.apache.flink.table.api.ValidationException: Function class 'class
> > > > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > > > serializable. Make sure that the class is self-contained (i.e. no
> > > > references to outer classes) and all inner fields are serializable as
> > well.
> > > >
> >

Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 Thread magichuang
您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java version 
"16" 2021-03-16,这个有影响吗?  我是在"1.8.0_202" 上提交的

提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 -ytm 
1024m -p 1 -py traffic.py




> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2021-04-14 23:11:57
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: pyflink 运行提示:Function class 'class 
> org.apache.flink.table.functions.python.PythonScalarFunction' is not 
> serializable
>
> 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。
>
> [1]
> https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
>
> On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote:
>
> > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on
> > yarn,per-job模式
> >
> > 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
> >
> >
> >
> >
> > 主要代码
> >
> > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> > '128m')
> >
> > t_env.get_config().get_configuration().set_string("pipeline.jars",
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> >
> > t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> >
> >
> >
> >
> > t_env.add_python_archive("venv.zip")
> >
> > t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
> >
> >
> >
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> > result_type=DataTypes.INT())
> >
> > def judge_ip(src_ip, dst_ip):
> >
> > import IPy
> >
> > .
> >
> > t_env.register_function('judge_ip', judge_ip)
> >
> >
> >
> > 下面是主要报错信息
> >
> > Traceback (most recent call last):
> >
> > File "traffic-tuple-sf.py", line 59, in
> >
> > t_env.register_function('judge_ip', judge_ip)
> >
> > File
> > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> > line 876, in register_function
> >
> > File
> > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> > line 1286, in __call__
> >
> > File
> > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
> > 147, in deco
> >
> > File
> > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line
> > 328, in get_return_value
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o5.registerFunction.
> >
> > : org.apache.flink.table.api.ValidationException: Function class 'class
> > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > serializable. Make sure that the class is self-contained (i.e. no
> > references to outer classes) and all inner fields are serializable as well.
> >
> > at
> > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)
> >
> > at
> > org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)
> >
> > at
> > org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)
> >
> > at
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)
> >
> > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> >
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
> >
> > at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >
> > at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> >
> > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> >
> > at
>

pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 Thread magichuang
flink版本:1.11.2   Python版本 3.6 apache-flink==1.11.2,  用的是flink on 
yarn,per-job模式

程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中




主要代码

t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
 '128m')

t_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")

t_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")




t_env.add_python_archive("venv.zip")

t_env.get_config().set_python_executable("venv.zip/venv/bin/python")




@udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], 
result_type=DataTypes.INT())

def judge_ip(src_ip, dst_ip):

import IPy

.

t_env.register_function('judge_ip', judge_ip)



下面是主要报错信息

Traceback (most recent call last):

File "traffic-tuple-sf.py", line 59, in 

t_env.register_function('judge_ip', judge_ip)

File 
"/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", 
line 876, in register_function

File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
line 1286, in __call__

File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
line 147, in deco

File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", 
line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling 
o5.registerFunction.

: org.apache.flink.table.api.ValidationException: Function class 'class 
org.apache.flink.table.functions.python.PythonScalarFunction' is not 
serializable. Make sure that the class is self-contained (i.e. no references to 
outer classes) and all inner fields are serializable as well.

at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)

at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)

at 
org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)

at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)

at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:567)

at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.base/java.lang.Thread.run(Thread.java:831)

Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field 
private final byte[] java.lang.String.value accessible: module java.base does 
not "opens java.lang" to unnamed module @1311d9fb

at 
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)

at 
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)

at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)

at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)

at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346)

... 14 more




麻烦各位大佬给看看是哪里有问题呀,应该如何修改~   感谢









Re: Re: flink on yarn启动失败

2020-12-23 Thread magichuang
感谢感谢感谢!!!

原来是这样,以为solt 缩写就是-s了,,,感谢这位朋友的解答,已经可以提交了~


> -- 原始邮件 --
> 发 件 人:"Yang Wang" 
> 发送时间:2020-12-24 11:01:46
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: flink on yarn启动失败
>
> 你这个命令写的有点问题,flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> traffic.py
>
> 应该是-ys,而不是-s
> -s是从savepoints恢复,所以报错里面会有找不到savepoints目录
>
>
> Best,
> Yang
>
> magichuang 于2020年12月23日周三 下午8:29写道:
>
> > 机器参数:三台 32C64G centos 7.8,cdh集群在这上面先部署
> > flink版本:1.11.2,在三台集群上搭建的集群
> >
> > hadoop集群是用cdh搭建的
> >
> >
> > 启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> > traffic.py
> >
> > 程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka
> >
> >
> >
> >
> > 这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败
> >
> > 测试官方例子 flink run -m yarn-cluster examples/batch/WordCount.jar
> > 是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?
> >
> >
> >
> >
> > 下面是主要报错信息
> >
> > Caused by: java.util.concurrent.CompletionException:
> > org.apache.flink.runtime.client.JobExecutionException: Could not
> > instantiate JobManager.
> >
> > at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> > ~[?:1.8.0_202]
> >
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > ... 4 more
> >
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> > not instantiate JobManager.
> >
> > at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> > ~[?:1.8.0_202]
> >
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > ... 4 more
> >
> > Caused by: java.io.FileNotFoundException: Cannot find checkpoint or
> > savepoint file/directory '2' on file system 'file'.
> >
> > at
> > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.create

flink on yarn启动失败

2020-12-23 Thread magichuang
机器参数:三台  32C64G  centos  7.8,cdh集群在这上面先部署
flink版本:1.11.2,在三台集群上搭建的集群

hadoop集群是用cdh搭建的


启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py traffic.py

程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka




这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败

测试官方例子  flink run -m yarn-cluster examples/batch/WordCount.jar   
是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?




下面是主要报错信息

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate 
JobManager.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
instantiate JobManager.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

Caused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint 
file/directory '2' on file system 'file'.

at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

2020-12-23 20:12:46,459 INFO org.apache.flink.runtime.blob.BlobServer [] - 
Stopped BLOB server at 0.0.0.0:16109







全部日志可以打开下面的链接:https://note.youdao.com/ynoteshare1/index.html?id=25f1af945e277057c2251e8f60d90f8a=note
加载可能慢一些,请稍等一会就出来了~













Best,

MagicHuang







Re: Re: flink clickhouse connector

2020-12-17 Thread magichuang
您是用java写的还是pyflink  啊?  我是用pyflink写的程序,所以需要一个jar包,您那里有嘛,我本地是新安装的maven,在打包  
但是一直在下载依赖好多。。


> -- 原始邮件 --
> 发 件 人:"guoliubi...@foxmail.com" 
> 发送时间:2020-12-17 19:36:55
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: flink clickhouse connector
>
> 我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。
>
>
>
> guoliubi...@foxmail.com
>
> From: magichuang
> Date: 2020-12-17 18:41
> To: user-zh
> Subject: flink clickhouse connector
> hi 想问一下有小伙伴使用flink 
> 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
>  这个flink-connector,但是运行报错了:
>
> Caused by: java.io.IOException: unable to establish connection to ClickHouse
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
>
> at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
>
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
> at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
>
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.IOException: table `default`.`traffic` is not a 
> Distributed table
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
>
> ... 12 more
>
>
>
>
> 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11
>
>
>
>
> 有小伙伴成功对接的嘛,可否分享一下connector呀
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best,
>
> MagicHuang
>
>
>
>
>



--

Best,

MagicHuang




flink clickhouse connector

2020-12-17 Thread magichuang
hi想问一下有小伙伴使用flink 
往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
  这个flink-connector,但是运行报错了:

Caused by: java.io.IOException: unable to establish connection to ClickHouse

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)

at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)

at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.IOException: table `default`.`traffic` is not a Distributed 
table

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)

... 12 more




但  traffic  这个表我在clickhouse里面创建了,flink版本是1.11




有小伙伴成功对接的嘛,可否分享一下connector呀



















Best,

MagicHuang







Re: Re: pyflink 引用第三库的文件出现安装权限的问题

2020-12-16 Thread magichuang
Hi,




在使用pyflink  提交任务时,指定  -pyarch venv.zip -pyexec 
venv.zip/venv/bin/python,任务里面用到了udf  IPy是第三方库,之前直接安装报权限错误,不能安装,现在想用虚拟环境的方法来解决

set_python_requirements 图片地址:https://s3.ax1x.com/2020/12/17/r8J6AI.png




之前是使用默认Python环境,会报安装权限被拒绝的问题,我刚才使用指定Python环境的方式提交了一下,里面也加上了  
set_python_requirements,没有再报权限的错误,并被提交到了yarn上有applicationid,这是说明已经在虚拟环境中安装成功了吧?




但是出现了新的错误,Caused by: java.net.ConnectException: Connection refused

错误图片地址:https://s3.ax1x.com/2020/12/17/r8YJKg.png




我的cdh环境是,hadoop也是在这三台机器上部署的

cdh001  cdh002  cdh003
flink 版本  1.11 

集群配置:

master   cdh001:8081cdh002:8081   

wokers  cdh001 cdh002 cdh003




看报错是在连接  cdh002:31331  的时候出现了问题,当任务被提交到yarn集群时,我在cdh002的机器上查找这个端口,并没有发现有31331存在 
   netstat  -ntlp|grep  31331   是空的




这个是因为什么呀?




Best,

Magichuang




> -- 原始邮件 --
> 发 件 人:"Xingbo Huang" 
> 发送时间:2020-12-16 12:42:48
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: pyflink 引用第三库的文件出现安装权限的问题
>
> Hi,
>
> 默认就是你每台机器的python指向的环境下,当然你也可以通过-pyexec指定不同的python环境
>
> Best,
> Xingbo
>
> magichuang 于2020年12月15日周二 下午8:02写道:
>
> > 我现在看看那个报错,flink是把requirements.txt 和 cached_dir 已经先上传到hdfs上了,因为
> > /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/requirements.txt
> >
> >
> > /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/cached_dir
> > 在提交的时候 去看机器上是存在的,只不过等程序挂了,这个
> > /yarn/nm/usercache/root/appcache/application_1608026509770_0001文件夹就没了,所以有感觉hdfs没有问题。。
> >
> > 现在想请教一下,flink在引入外部 python依赖时,在从离线包里面安装库的时候是安装到了哪里?
> >
> >
> >
> >
> > 我看报错信息: Error [Errno 13] Permission denied: '' while executing command
> > python setup.py egg_info
> >
> > 因为它是在 python setup.py 的时候报的权限问题
> >
> >
> >
> >
> > 求大家给看看~~ 感谢
> >
> >
> >
> >
> > -- 原始邮件 --
> >
> > 发 件 人:magichuang
> >
> > 发送时间:2020-12-15 14:15:04
> >
> > 收 件 人:user-zh
> >
> > 抄 送:
> >
> > 主 题:pyflink 引用第三库的文件出现安装权限的问题
> >
> >
> >
> >
> > 请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错
> >
> > flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器
> >
> >
> >
> >
> > 提交命令:flink run -m yarn-cluster -ynm demo -ys 2 -ytm 2048 -p 2 -py demo.py
> >
> >
> >
> >
> > 代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png
> >
> >
> >
> >
> > 报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png
> >
> >
> >
> >
> > requestments.txt: IPy==1.0 cache_dir: IPy-1.00.tar.gz
> >
> >
> >
> >
> > 自定义udf代码:
> >
> > @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> >
> > def judge_ip(ip):
> >
> > import IPy
> >
> > if ip in IPy.IP('192.168.112.0/28'):
> >
> > return 'in'
> >
> > return 'out'
> >
> >
> >
> >
> >
> >
> >
> > 祝好~
> >
> >
> >



--

Best,

MagicHuang




Re: Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 Thread magichuang
hi,

想问一下您这个  
https://github.com/apache/bahir-flink/tree/master/flink-connector-redis 
可以打包成jar包嘛,然后在pyflink里用

对java不熟悉,我看这个页面里面只是对java和scala说了如何用







Best,

MagicHuang




> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2020-12-17 10:16:13
> 收 件 人:user-zh ,hepei...@qq.com
> 抄 送:
> 主 题:Re: 求教:pyflink的sink是否支持redis connector?
>
> 感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>
> redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
>
> 关于如何在PyFlink中使用connector,可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html
>
> > 在 2020年12月17日,上午9:52,Xingbo Huang 写道:
> >
> > Hi,
> >
> > 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
> > connector,关于如何自定义connector,你可以参考文档[2]
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
> > [2]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
> >
> > Best,
> > Xingbo
> >
> >
> > 消息室 于2020年12月17日周四 上午9:33写道:
> >
> >> 您好:
> >>
> >>  
> >> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
> >> connector?感谢!
> >>   如不支持,有何建议方式?
>






Re: pyflink 引用第三库的文件出现安装权限的问题

2020-12-15 Thread magichuang
我现在看看那个报错,flink是把requirements.txt  和  cached_dir  已经先上传到hdfs上了,因为  
/yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/requirements.txt
   

 
/yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/cached_dir
  在提交的时候  去看机器上是存在的,只不过等程序挂了,这个 
/yarn/nm/usercache/root/appcache/application_1608026509770_0001文件夹就没了,所以有感觉hdfs没有问题。。

现在想请教一下,flink在引入外部  python依赖时,在从离线包里面安装库的时候是安装到了哪里?




我看报错信息:  Error [Errno 13] Permission denied: '' while executing command python 
setup.py egg_info

因为它是在  python setup.py  的时候报的权限问题




求大家给看看~~感谢  




-- 原始邮件 --

发 件 人:magichuang 

发送时间:2020-12-15 14:15:04

收 件 人:user-zh 

抄 送:

主 题:pyflink 引用第三库的文件出现安装权限的问题




请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错

flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器




提交命令:flink run -m yarn-cluster -ynm demo  -ys 2 -ytm 2048 -p 2 -py demo.py




代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png




报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png




requestments.txt:IPy==1.0cache_dir:  IPy-1.00.tar.gz




自定义udf代码:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def judge_ip(ip):

import IPy

if ip in IPy.IP('192.168.112.0/28'):

return 'in'

return 'out'







祝好~




pyflink 引用第三库的文件出现安装权限的问题

2020-12-14 Thread magichuang
请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错

flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器




提交命令:flink run -m yarn-cluster -ynm demo  -ys 2 -ytm 2048 -p 2 -py demo.py




代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png




报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png




requestments.txt:IPy==1.0cache_dir:  IPy-1.00.tar.gz




自定义udf代码:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def judge_ip(ip):

import IPy

if ip in IPy.IP('192.168.112.0/28'):

return 'in'

return 'out'







祝好~

pyflink udf 依赖问题

2020-12-14 Thread magichuang
大家好,我刚才修改了集群的flink配置文件  python.client.executable: /usr/bin/python3.6   
,将集群三台机器的Python默认  改为了python3.6 

flink版本:1.11flink  on  yarn集群搭建的,通过per-job模式提交任务的

提交命令:flink run -m yarn-cluster   -ytm 2048  -s  2 -p 2 -py traffic.py

截图地址

看报错是目录权限问题,是通过  先离线安装到cached_dir 然后提交的,flink集群搭建时用的是root用户,提交任务时也是root用户

截图地址




pyflink udf 依赖问题

2020-12-14 Thread magichuang
大家好,我刚才修改了集群的flink配置文件  python.client.executable: /usr/bin/python3.6   
,将集群三台机器的Python默认  改为了python3.6 

flink版本:1.11flink  on  yarn集群搭建的,通过per-job模式提交任务的

提交命令:flink run -m yarn-cluster   -ytm 2048  -s  2 -p 2 -py traffic.py

提交之后出现了新的问题,看报错是目录权限问题,是通过  先离线安装到cached_dir 然后提交的

pyflink udf依赖引用问题

2020-12-14 Thread magichuang
在使用pyflink  udf时,引用第三方依赖时出现了一下问题




如果直接通过  pythonapp.py运行是没有问题的,可以出结果,但是提交到集群上就不行了

flink版本  1.11,是flink  onyarn集群部署的,通过  per-job模式提交任务,集群一共三台机器,在其中一台上提交的

下面是代码截图

麻烦给看一下,上面那个报错是因为什么呀?需要在其他两台机器上提前把cache_dir弄好吗?  
我看日志是有尝试在hdfs上新创建一个临时文件夹来放置依赖tar.gz文件的




祝好~




Re: Re: Flink SQL 怎么为每一个任务分配不同的内存配置

2020-12-13 Thread magichuang

各位好,最近也在思考这个问题,我是采用的flink  on  yarn集群部署的,每次通过Per-job模式提交任务时虽然指定了  -ytm  
2048,但是当运行起来之后去看yarn资源会发现可用内存并不是减少2g  而是减少了3g,也就是实际占用会比我给定的内存多1g。
如果每个sql任务单独运行的话,这样会不会造成资源浪费呀?




再用sql语句编写程序时,能不能在一个任务中,在用多个source、transformation、sink情况下,为每个sql单独指定solt数?




祝好~

> -- 原始邮件 --
> 发 件 人:"Kyle Zhang" 
> 发送时间:2020-12-14 09:25:59
> 收 件 人:user-zh@flink.apache.org
> 抄 送:
> 主 题:Re: Flink SQL 怎么为每一个任务分配不同的内存配置
>
> 一个集群跑一个SQL任务怎么样
>
> On Mon, Dec 14, 2020 at 8:42 AM yinghua...@163.com
> wrote:
>
> > Flink 作业在提交时可以通过参数指定JobManager
> > 和TaskManager的内存配置,但是SQL执行时怎么为每一个任务指定其内存配置,是不是都是读同一个flink-conf.yaml中的配置?
> >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#memory-configuration
> > 中内存的配置都是基于flink-conf.yaml文件来操作的,是全局的配置,没有找到基于SQL任务独立配合内存的?
> >
> >
> >
> > yinghua...@163.com
> >