hi~
I came across a problem I didn't understand,I can't use pyflink
aggfuction function properly in window tvf, The following are available:
java aggfuntion
flink system aggfunction
window (not window tvf)
I want to know if this is a bug or if I'm using it the wrong way?
pyflink 1.17.1
可以排查下是不是Flink_Lib_Home里自己“多手“引入了flink-python-x.x.jar,如果是的话,去掉就行,不然会导致beam包冲突
On 2020/11/03 01:44:00 jing wrote:
> Hi, jincheng.
>
> 目前也遇到了类似问题,请问有什么思路吗?
>
> flink-python 的 jar 都是有的,且版本是对的。
>
> 版本是 1.11.1,这个主要是在引入了 udf 时出现的,之前是正常的。
>
> 具体报错如下:
>
> Caused by: java.lang.NoClassDefFoundError: Could
Hi,
根据报错的提示,执行命令./python3.6.8.zip/bin/python3时没法导入pyflink,你可以在本地检查一下你的这个虚拟环境是不是没有成功安上pyflink
Best,
Xingbo
程龙 <13162790...@163.com> 于2022年11月25日周五 16:02写道:
> 在使用pyflink提交任务时,部署模式onyarn
> 1 在不使用Map等算子下如下参数 能够提交成功 并且运行
> .flink run -ynm pytest -m yarn-cluster -pycliente
集群端的 Python 环境中没有安装 PyFlink: ***/python3 这个环境
On Fri, Nov 25, 2022 at 4:02 PM 程龙 <13162790...@163.com> wrote:
> 在使用pyflink提交任务时,部署模式onyarn
> 1 在不使用Map等算子下如下参数 能够提交成功 并且运行
> .flink run -ynm pytest -m yarn-cluster -pyclientexec ***/python3
> -pyexec ***/python3 -pyarch **
在使用pyflink提交任务时,部署模式onyarn
1 在不使用Map等算子下如下参数 能够提交成功 并且运行
.flink run -ynm pytest -m yarn-cluster -pyclientexec ***/python3 -pyexec
***/python3 -pyarch *** /python3.6.8.zip -py demo.py
2 在使用到map算子时 提交没有问题,但是运行报错,报错日志如下:
.flink run -ynm pytest -m yarn-cluster -pyclientexec
是的
Best,
Zhanghao Chen
From: yidan zhao
Sent: Thursday, August 25, 2022 10:20
To: user-zh
Subject: Re: pyflink内存管理
感谢。我是standalone集群,配置到 flink-conf.yaml 就可行吧。
https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task
感谢。我是standalone集群,配置到 flink-conf.yaml 就可行吧。
https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper
该文章说到的必须通过 tableEnv 配置是因为使用 pyflink-shell ?
我提交是用 flink run 提交的。
yu'an huang 于2022年8月25日周四 09:25写道:
>
> 你好,
> python部分
你好,
python部分的内存算flink taskmanager 配置的内存,你应该可以用参数
*'taskmanager.memory.task.off-heap.size*
来配置,可以参考这个问题:
https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper
On Wed, 24 Aug 2022 at 1:05 PM, yidan zhao wrote:
> 如题,pyflink
如题,pyflink场景的任务,内存是如何管理呢。
python部分的内存是否算入flink TaskManager配置的内存中呢?
比如python算子通过多进程做各种复杂的运算,这部分内存占用是否算入flink呢?
——
如果不算的话,使用pyflink时,容器内存和flink TaskManager内存配置是不是需要预留空间?
是的,当前 PyFlink 还不支持 side output,side output 的支持已经完成开发,会在接下来发布的 1.16 版本中支持。
On Thu, Aug 4, 2022 at 11:50 AM yidan zhao wrote:
> 1 需求是根据输入流,根据字段判定,拆分并输出为2个流。
>
> 2 目前看 pyflink 的 api,貌似不支持 sideoutput。
>
> 3 虽然可以基于输入流 A,连续处理2次,即输入流 A 流向算子 B 和算子 C,分别筛选自己需要的数据进行处理。但这样会导致数据重复传输。
>
1 需求是根据输入流,根据字段判定,拆分并输出为2个流。
2 目前看 pyflink 的 api,貌似不支持 sideoutput。
3 虽然可以基于输入流 A,连续处理2次,即输入流 A 流向算子 B 和算子 C,分别筛选自己需要的数据进行处理。但这样会导致数据重复传输。
他的相关日志吗?
> >
> > Best,
> > Weihua
> >
> >
> > On Wed, Jul 27, 2022 at 5:36 PM yidan zhao wrote:
> >
> > > 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?
> > >
> > > yidan zhao 于2022年7月27日周三 17:34写道:
> > >
我是本地直接ide内run。
Weihua Hu 于2022年7月27日周三 22:10写道:
>
> Hi, 你是怎么提交的任务呢?是提交到远端的 session cluster 上吗?有其他的相关日志吗?
>
> Best,
> Weihua
>
>
> On Wed, Jul 27, 2022 at 5:36 PM yidan zhao wrote:
>
> > 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?
> >
Hi, 你是怎么提交的任务呢?是提交到远端的 session cluster 上吗?有其他的相关日志吗?
Best,
Weihua
On Wed, Jul 27, 2022 at 5:36 PM yidan zhao wrote:
> 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?
>
> yidan zhao 于2022年7月27日周三 17:34写道:
> >
> > 我将这3个jar放到pyflink的lib下则是可以的。通过 add_
而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?
yidan zhao 于2022年7月27日周三 17:34写道:
>
> 我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。
>
> yidan zhao 于2022年7月27日周三 10:40写道:
> >
> > pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
> > 但 f
我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。
yidan zhao 于2022年7月27日周三 10:40写道:
>
> pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
> 但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar +
> kafka-clients-2.8.1.jar 却报:
> py4j.pro
pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar +
kafka-clients-2.8.1.jar 却报:
py4j.protocol.Py4JError:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer does
not exist in the JVM
Weihua Hu 于2022年7月26日周二 21:21写道
最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer
Best,
Weihua
On Tue, Jul 26, 2022 at 5:40 PM yidan zhao wrote:
> 如题,我看注释和文档。
> add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
>
如题,我看注释和文档。
add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
Yuxia
>
> - 原始邮件 -
> 发件人: "张 兴博"
> 收件人: "user-zh"
> 发送时间: 星期一, 2022年 6 月 20日 上午 8:54:21
> 主题: 关于PyFlink的环境问题,期望得到回复。
>
> 您好:
>
>
> 我是西南财经大学一个在读博士,目前想使用PyFlink,但是遇到了一个问题,我的集群采用的CDH6.2.0,其中的Hadoop为3.0.0版本。当我在pytlink
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/yarn/#preparation
张 兴博 于2022年6月20日周一 09:36写道:
> 您好:
>
>
> 我是西南财经大学一个在读博士,目前想使用PyFlink,但是遇到了一个问题,我的集群采用的CDH6.2.0,其中的Hadoop为3.0.0版本。当我在pytlink的程序里引入hadoop-common-3.0.0.jar(或hadoop-common-3.
.jar
Best regards,
Yuxia
- 原始邮件 -
发件人: "张 兴博"
收件人: "user-zh"
发送时间: 星期一, 2022年 6 月 20日 上午 8:54:21
主题: 关于PyFlink的环境问题,期望得到回复。
您好:
我是西南财经大学一个在读博士,目前想使用PyFlink,但是遇到了一个问题,我的集群采用的CDH6.2.0,其中的Hadoop为3.0.0版本。当我在pytlink的程序里引入hadoop-common-3.0.0.jar(或hadoop-common-3.0.0
您好:
我是西南财经大学一个在读博士,目前想使用PyFlink,但是遇到了一个问题,我的集群采用的CDH6.2.0,其中的Hadoop为3.0.0版本。当我在pytlink的程序里引入hadoop-common-3.0.0.jar(或hadoop-common-3.0.0-cdh6.2.0.jar)的时候,运行程序就会报错:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.hadoop.security.UserGroupInformation
Huang wrote:
> Hi,
>
> 你可以执行 pip install -r flink-python/dev/dev-requirements.txt 安装开发环境所需要的依赖
>
> Best,
> Xingbo
>
> 张 兴博 于2022年6月15日周三 10:20写道:
>
> > 您好:
> > 我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:
> >
> > Traceback
Hi,
你可以执行 pip install -r flink-python/dev/dev-requirements.txt 安装开发环境所需要的依赖
Best,
Xingbo
张 兴博 于2022年6月15日周三 10:20写道:
> 您好:
>我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:
>
> Traceback (most recent call last):
> File "/root/.py"
您好:
我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:
Traceback (most recent call last):
File "/root/.py", line 6, in
s_env = StreamExecutionEnvironment.get_execution_environment()
File
"/usr/local/lib/python3.8/dist-packages/pyf
Hi,
感谢,查询pyflink目录下,里面确实存在多个版本的jar包,我清理了下,可以运行起来了,
看来是PyCharm的bug了,安装新版本的时候没有成功清理旧的版本
Thanks~
在 2022-05-23 19:27:42,"Dian Fu" 写道:
>>> java.lang.NoSuchMethodError:
>org.apache.flink.util.NetUtils.getAvailablePort()I
>
>你的环境是不是不太干净?可以检查一下 PyFlink 安装目录下(site-packag
>> java.lang.NoSuchMethodError:
org.apache.flink.util.NetUtils.getAvailablePort()I
你的环境是不是不太干净?可以检查一下 PyFlink 安装目录下(site-packages/pyflink/lib) 的那些 jar 包的版本。
On Mon, May 23, 2022 at 4:22 PM RS wrote:
> Hi,
> 在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码
> 参考官方文档:
> https://ni
Hi,
在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码
参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/
报错如下:
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.NoSuchM
PRIMARY KEY(currency) NOT ENFORCED ) WITH (
> 'connector' = 'kafka', 'value.format' = 'debezium-json',/* ... */
> );基于以上,我的疑惑共有几点:1.是维表不能使用‘datagen’数据来源吗?还是pyflink 1.14
> 目前不支持维表ddl?2.阿里云的维表ddl语句和官网的维表ddl语句有什么区别吗?他们的使用场景是什么?3.如何能够让我在练习的时候正确使用维表join请大家指点一下,感激不尽
MARK FOR update_time AS
update_time, PRIMARY KEY(currency) NOT ENFORCED ) WITH ( 'connector' =
'kafka', 'value.format' = 'debezium-json',/* ... */
);1.datagen??pyflink
1.14
??ddl??2.dd
2021 at 10:48 AM Asahi Lee
<978466...@qq.com.invalidgt;
wrote:
gt; ??source
my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE??
gt; jobmanagerNo module named
pyflinkjobmanageryarn
gt; ??
Flink用的1.14.0,venv.zip中的PyFlink版本是多少?
On Sun, Nov 21, 2021 at 7:59 PM Asahi Lee wrote:
> Hi!
> 我通过如下命令提交成功了,python的参数需求-D方式传入,-py方式传入不生效:
> ./flink-1.14.0/bin/flink
> run-application -t yarn-application
> -Dyarn.provided.lib.dirs="hdfs://nameservice1
??source
my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE??????
jobmanagerNo module named
pyflinkjobmanageryarn
??
gt; LogType:jobmanager.out
gt; Log Upload Time:?? ?? 18 20:48:45 +0800 2021
gt;
刚注意到你用的YARN application模式,PyFlink 1.14.0才支持YARN application模式,主要是新增了命令行选项“
-pyclientexec” 和配置“python.client.executable”:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable
对于你这个作业来说,你需要通过使用1.14.0版本,同时添加命令行选项:-pyclientexec venv.zip
??source
my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE??
jobmanagerNo module named
pyflinkjobmanageryarn
??
LogType:jobmanager.out
Log Upload Time:?? ?? 18 20:48:45 +0800 2021
a Table api中使用python udf
> 函数,通过下面的命令提交应用,报无法启动python服务错误,请问我的提交方式对吗?jm日志为/bin/python: No module named
> pyflink。
>
>
> ./flink-1.13.2/bin/flink
> run-application -t yarn-application
> -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"
> -Dyarn.appl
Hi !
java Table api??python udf
pythonjm??/bin/python:
No module named pyflink??
./flink-1.13.2/bin/flink
run-application -t yarn-application
-Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/fli
Hi!??https://nightlies.apache.org/flink/flink-docs-release-1.12/downloads/setup-pyflink-virtual-env.sh
?? 2021-11-18 15:05:03??"Asahi Lee" <978466...@qq.com.INVALID> ??
>Hi!
>
>flink??setup-pyflink-virtua
Hi!
flink??setup-pyflink-virtual-env.sh
python??
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/faq/
FYI
On Wed, Nov 10, 2021 at 9:31 AM Dian Fu wrote:
> 也可以通过以下方式:
> - Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定
> - Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定
>
>
> 但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速
也可以通过以下方式:
- Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定
- Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定
但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速度。
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python
HI!
??flink 1.13.2??java table
apipython
udf??yarn-applicationyarn??pyflink?
图挂了,邮件列表不能直接发图片。可以发一下更详细的日志信息吗?
On Tue, Oct 19, 2021 at 6:34 PM xuzh wrote:
> 错误日志
> Exception in thread Thread-14:
> Traceback (most recent call last):
> File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in
> _bootstrap_inner
> self.run()
> File
>
Exception in thread Thread-14:
Traceback (most recent call last):
File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in
_bootstrap_inner
self.run()
File
"D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py",
line 218, in run
while not
----
??:
"user-zh"
??udfudfjar??
-- --
??:
??udfudfjar??
----
??:
我试了一下是可以运行的,可以发一下报错吗?
On Mon, Oct 18, 2021 at 6:44 PM xuzh wrote:
> from pyflink.table import ScalarFunction, EnvironmentSettings,
> TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
> def
from pyflink.table import ScalarFunction, EnvironmentSettings,
TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import call, row
class HashCode(ScalarFunction):
def __init__(self):
self.factor = 12
def eval(self, s):
return
ight", "credits" or "license" for more information.
>>> import pyflink
>>> pyflink.__file__
'/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyflink/__init__.py'
>>> exit()
$ cd
/Library/Frameworks/Pytho
1]).process(MyFunction())
> results.print()
>
> env.execute("test_job")
>
> if __name__ == "__main__":
> main()
>
>
> Dian Fu 于2021年7月12日周一 下午4:48写道:
>
>> Hi,
>>
>> 是否发一下可复现的完整示例?
>>
>> Regards,
>> D
t_job")
if __name__ == "__main__":
main()
--
On 2021/07/12 08:47:59, Dian Fu wrote:
> Hi,
>
> 是否发一下可复现的完整示例?
>
> Regards,
> Dian
>
> > 2021年7月10日 下午7:44,赵飞 写道:
> >
> > 各位好,请教一个问题。
> >
> > 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据
2021年7月12日周一 下午4:48写道:
> Hi,
>
> 是否发一下可复现的完整示例?
>
> Regards,
> Dian
>
> > 2021年7月10日 下午7:44,赵飞 写道:
> >
> > 各位好,请教一个问题。
> >
> >
> 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码
Hi,
是否发一下可复现的完整示例?
Regards,
Dian
> 2021年7月10日 下午7:44,赵飞 写道:
>
> 各位好,请教一个问题。
>
> 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下:
>
> ---
> results = data.connect(rules).key_by('product_id',
> 'prod
各位好,请教一个问题。
最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下:
---
results = data.connect(rules).key_by('product_id',
'product_id').process(MyFunction())
results.print()
class MyFunction(KeyedCoProcessFunction):
def open(self, ctx
要用fat jar:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.1/flink-sql-connector-kafka_2.11-1.13.1.jar
> 2021年6月2日 下午2:43,qianhuan <819687...@qq.com> 写道:
>
> 版本:
> python 3.8
> apache-flink 1.13.1
> apache-flink-libraries 1.13.1
>
> 代码:
>
是不是connector版本问题,之前1.12.2可以跑,有没有大神帮忙看下
--
Sent from: http://apache-flink.147419.n8.nabble.com/
版本:
python 3.8
apache-flink 1.13.1
apache-flink-libraries 1.13.1
代码:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q
????pyflink??jdbc??jar??jdbc??flink??1.13.1
gt;
gt;
gt;
gt; from pyflink.datastream import StreamExecutionEnvironment
gt; from pyflink.table import StreamTableE
gt; 发送时间:2021年6月1日(星期二) 下午5:30
> 收件人:"user-zh"
> 主题:回复: Pyflink jdbc相关
>
>
>
>
>
> 感谢,我换成2.11确实可以了
>
>
> -- 原始邮件 --
> 发件人:
;
??
----
??:
""
<1129656...@qq.com;
:2021??6??1??(??) 5:30
??:"user-zh"https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q
pyflink
er-zh"
https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q
????pyflink??jdbc??jar??jdbc??flink??1.13.1
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableE
Hi,
本地执行:
1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的
flink run:
1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。
> 2021年6月1日 下午4:33,琴师 <1129656...@qq.com> 写道:
>
> Hi,
> 我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q
>
Hi??
??https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q
pyflink??jdbc??jar??jdbc??flink??1.13.1
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
非常感谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
用Table API的话,可以看一下这个:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#joins
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#joins>
另外,也可以直接在PyFlink中调用SQL语句:
https://ci.apache.org/projects/flink/flink-docs-releas
想实现pyflink双流join,没有找到相关示例,有没有大神指导下用pyflink是否能实现?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi all,
【填问卷,抽奖送T恤!】它来了它来了 ~ 为了更好地服务 PyFlink 用户,帮助 PyFlink 用户将 PyFlink 应用到生产环境中,Apache
Flink 中文社区接下来计划推出一系列 PyFlink 的相关的文档及参考资料,让 PyFlink 用户得到更多优质的 PyFlink 学习资料!
为此我们推出这个调查问卷,了解大家感兴趣的内容,希望大家积极参与这个问卷,帮助我们更好的去整理 PyFlink 相关学习资料~
PS:填完问卷后即可参与抽奖,Flink 定制款 Polo 衫送送送!4月30日中午12:00准时开奖哦 ~
https
非常感谢,已解决,sql写错了。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
把flink-connector-kafka_2.11-1.12.2.jar删了
> 2021年4月19日 下午6:08,qianhuan <819687...@qq.com> 写道:
>
> 感谢回复
> 导入了flink-sql-connector-kafka_2.11-1.12.2.jar和flink-connector-kafka_2.11-1.12.2.jar,并且放到了site-packages/pyflink/lib目录下,还是一样的报错。
> test_source_table_1这个ka
感谢回复
导入了flink-sql-connector-kafka_2.11-1.12.2.jar和flink-connector-kafka_2.11-1.12.2.jar,并且放到了site-packages/pyflink/lib目录下,还是一样的报错。
test_source_table_1这个kafka的表应该是创建成功了,是查询的问题吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
'topic' = 'test1',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
>)
>"""
> pyflink版本:
> apache-flink 1.12.2
>
> 导
'localhost:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
pyflink版本:
apache-flink 1.12.2
导入的jar包:flink-connector-kafka_2.11-1.12.2.jar
python执行报错信息:
py4j.protocol.Py4JJavaError: An error occu
感谢~ 通过多次调试 是打的venv 包有问题, 已经解决了 现在可以在集群上跑了谢谢~
> -- 原始邮件 --
> 发 件 人:"Dian Fu"
> 发送时间:2021-04-15 10:32:49
> 收 件 人:user-zh ,magichu...@88.com
> 抄 送:
> 主 题:Re: Re: pyflin
命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1
> -ytm 1024m -p 1 -py traffic.py
>
>
>
>
> > -- 原始邮件 --
> > 发 件 人:"Dian Fu"
> > 发送时间:2021-04-14 23:11:57
> > 收 件 人:user-zh
> > 抄 送:
> > 主
--
> 发 件 人:"Dian Fu"
> 发送时间:2021-04-14 23:11:57
> 收 件 人:user-zh
> 抄 送:
> 主 题:Re: pyflink 运行提示:Function class 'class
> org.apache.flink.table.functions.python.PythonScalarFunction' is not
> serializable
>
> 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮
ink on
> yarn,per-job模式
>
> 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
>
>
>
>
> 主要代码
>
> t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> '128m')
>
> t_env.get_config().get_configuration().
flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on
yarn,per-job模式
程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
主要代码
t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
'128m')
t_env.get_config
处理逻辑看起来应该是没有问题的。
1)可以详细说一下,你说的数据延迟问题吗?现在的qps可以达到多少,预期是多少?
2)你现在用的哪种部署模式?
3)并发度的设置可以参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/table_environment.html#configuration
hidden email
在2021年04月06日 11:36,苗红宾 写道:
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。
现在的使用方式:
1、slide_window =
Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。
现在的使用方式:
1、slide_window =
Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka connector,
3、result table使用普通的print:
CREATE
感谢回复!
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1]
来记录这个问题了。目前的workaroud方案是使用Table API。
具体可以参考下面的代码:
>>>
a = t_env.sql_query("""
SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
apache-flink 1.11.1
--
Sent from: http://apache-flink.147419.n8.nabble.com/
quot;,
line 280, in
t_env.execute('NT重连预测参数')
File
"D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py",
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.
Hi,
其实pyflink作业就两种,一种是用了python udf的,一种是没用python udf
1. 对于没用python
udf的作业,你写的所有python代码就是api层调用,只负责在客户端编译作业。你可以认为实际运行的作业代码全都是java的同一套代码,都是在JVM里面跑的,也就不存在性能差别,如果你觉得哪个操作性能不行,那就得去分析java对应算子的性能问题。
2. 对于用了python
udf的作业,因为你写的udf函数内容是python代码,这种代码在运行时JVM不认识,你需要有PVM执行这种代码,所以会起python进程专门执行udf里面的内容,所以涉及到IPC通
Hi, Xingbo
想跟您了解一下关于sql_query执行上的细节,flink1.12版本底层执行sql语句的过程中,是否有谓词下退的优化?
从相关的代码测试结果看:
1. pyflink1.11版本的connector定义支持参数read.query来获取数据,执行效率很高,猜测这部分执行交由数据库完成;
2. pyflink1.12版本取消了read.query参数,当定义多个数据源执行join等操作时,耗时很明显(pyflink)
所以,基于上述这种情况,想跟您请教一下这部分耗时,也是因为python的语言缺陷,或者ipc开销?还是底层的实
你好,谢谢你的回复,现在更新到V1.12就可以直接运行了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
-aggregate-functions
Best,
Xingbo
xiaoyue 于2021年3月16日周二 上午11:42写道:
> 您好,
> 目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。
> pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;
> 不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;
> 目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf
您好,
目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。
pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;
不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;
目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多;
个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率
Hi,
有几个疑问:
1)你说的map reduce函数具体指的什么?可以举一个例子吗?
2)DataSet API指的是Java的DataSet API吗?另外,Java的DataSet API会逐步废弃,统一到DataStream
API上来,所以PyFlink里不会支持DataSet API,只支持Python Table API和Python DataStream API
> 2021年3月13日 上午10:54,nova.he 写道:
>
> 你好,
>
> 最近项目想使用flink进行分布式计算,之前项目是Pytho
简单提供了下 可复现的例子,请帮忙看看~谢谢!
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好,
最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map
reduce函数,所以有以下疑问:
1.Python flink的SDK还没支持dataset吗?
2.是不是有其他替代方法?
3.如果还没支持,有计划支持的时间吗?
4.flink table为啥不支持map reduce操作?
5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
reduce操作,对
apache-flink 1.11.1
--
Sent from: http://apache-flink.147419.n8.nabble.com/
###
## 可执行
###
t_env.sql_query("""
SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta) predict
FROM
source
""").insert_into("print_table")
t_env.execute('pyflink UDTF')
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好,
最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,没有相关map
reduce函数,所以有以下疑问:
1.Python flink的SDK还没支持dataset吗?
2.是不是有其他替代方法?
3.如果还没支持,有计划支持的时间吗?
4.flink table为啥不支持map reduce操作?
5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
reduce操作,对应
从报错看,似乎是作业运行的时候,找不到pyflink,如果确实是这样的话,有几个解决方案:
- 通过API指定集群端的Python路径: set_python_executable,参考 [1]
- 通过配置python.executable,参考[2]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html
[2]
https://ci.apache.org/projects/flink
1)在PyFlink Table API中可以使用所有SQL中支持的connector,所以HBase
connector也自然支持,具体使用方式可以看一下文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#how-to-use-connectors
2)HBase connector的使用方式可以看一下:
https://ci.apache.org/projects/flink
共有 439 项搜索結果,以下是第 1 - 100 matches
Mail list logo