pyflink aggfunction in window tvf can not sink connection='kafka', it notice consuming update changesm, java aggfunction and aggfunction in flink, such as sum is ok

2023-08-26 文章 faronzz
hi~ I came across a problem I didn't understand,I can't use pyflink aggfuction function properly in window tvf, The following are available: java aggfuntion flink system aggfunction window (not window tvf) I want to know if this is a bug or if I'm using it the wrong way? pyflink 1.17.1

RE: Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2022-12-20 文章 kung harold
可以排查下是不是Flink_Lib_Home里自己“多手“引入了flink-python-x.x.jar,如果是的话,去掉就行,不然会导致beam包冲突 On 2020/11/03 01:44:00 jing wrote: > Hi, jincheng. > > 目前也遇到了类似问题,请问有什么思路吗? > > flink-python 的 jar 都是有的,且版本是对的。 > > 版本是 1.11.1,这个主要是在引入了 udf 时出现的,之前是正常的。 > > 具体报错如下: > > Caused by: java.lang.NoClassDefFoundError: Could

Re: Pyflink提交

2022-11-25 文章 Xingbo Huang
Hi, 根据报错的提示,执行命令./python3.6.8.zip/bin/python3时没法导入pyflink,你可以在本地检查一下你的这个虚拟环境是不是没有成功安上pyflink Best, Xingbo 程龙 <13162790...@163.com> 于2022年11月25日周五 16:02写道: > 在使用pyflink提交任务时,部署模式onyarn > 1 在不使用Map等算子下如下参数 能够提交成功 并且运行 > .flink run -ynm pytest -m yarn-cluster -pycliente

Re: Pyflink提交

2022-11-25 文章 Dian Fu
集群端的 Python 环境中没有安装 PyFlink: ***/python3 这个环境 On Fri, Nov 25, 2022 at 4:02 PM 程龙 <13162790...@163.com> wrote: > 在使用pyflink提交任务时,部署模式onyarn > 1 在不使用Map等算子下如下参数 能够提交成功 并且运行 > .flink run -ynm pytest -m yarn-cluster -pyclientexec ***/python3 > -pyexec ***/python3 -pyarch **

Pyflink提交

2022-11-25 文章 程龙
在使用pyflink提交任务时,部署模式onyarn 1 在不使用Map等算子下如下参数 能够提交成功 并且运行 .flink run -ynm pytest -m yarn-cluster -pyclientexec ***/python3 -pyexec ***/python3 -pyarch *** /python3.6.8.zip -py demo.py 2 在使用到map算子时 提交没有问题,但是运行报错,报错日志如下: .flink run -ynm pytest -m yarn-cluster -pyclientexec

Re: pyflink内存管理

2022-08-25 文章 zhanghao.chen
是的 Best, Zhanghao Chen From: yidan zhao Sent: Thursday, August 25, 2022 10:20 To: user-zh Subject: Re: pyflink内存管理 感谢。我是standalone集群,配置到 flink-conf.yaml 就可行吧。 https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task

Re: pyflink内存管理

2022-08-24 文章 yidan zhao
感谢。我是standalone集群,配置到 flink-conf.yaml 就可行吧。 https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper 该文章说到的必须通过 tableEnv 配置是因为使用 pyflink-shell ? 我提交是用 flink run 提交的。 yu'an huang 于2022年8月25日周四 09:25写道: > > 你好, > python部分

Re: pyflink内存管理

2022-08-24 文章 yu'an huang
你好, python部分的内存算flink taskmanager 配置的内存,你应该可以用参数 *'taskmanager.memory.task.off-heap.size* 来配置,可以参考这个问题: https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper On Wed, 24 Aug 2022 at 1:05 PM, yidan zhao wrote: > 如题,pyflink

pyflink内存管理

2022-08-23 文章 yidan zhao
如题,pyflink场景的任务,内存是如何管理呢。 python部分的内存是否算入flink TaskManager配置的内存中呢? 比如python算子通过多进程做各种复杂的运算,这部分内存占用是否算入flink呢? —— 如果不算的话,使用pyflink时,容器内存和flink TaskManager内存配置是不是需要预留空间?

Re: pyflink目前map、flatmap都是process实现,那么process当前如何支持sideoutput呢?

2022-08-04 文章 Dian Fu
是的,当前 PyFlink 还不支持 side output,side output 的支持已经完成开发,会在接下来发布的 1.16 版本中支持。 On Thu, Aug 4, 2022 at 11:50 AM yidan zhao wrote: > 1 需求是根据输入流,根据字段判定,拆分并输出为2个流。 > > 2 目前看 pyflink 的 api,貌似不支持 sideoutput。 > > 3 虽然可以基于输入流 A,连续处理2次,即输入流 A 流向算子 B 和算子 C,分别筛选自己需要的数据进行处理。但这样会导致数据重复传输。 >

pyflink目前map、flatmap都是process实现,那么process当前如何支持sideoutput呢?

2022-08-03 文章 yidan zhao
1 需求是根据输入流,根据字段判定,拆分并输出为2个流。 2 目前看 pyflink 的 api,貌似不支持 sideoutput。 3 虽然可以基于输入流 A,连续处理2次,即输入流 A 流向算子 B 和算子 C,分别筛选自己需要的数据进行处理。但这样会导致数据重复传输。

Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-31 文章 Dian Fu
他的相关日志吗? > > > > Best, > > Weihua > > > > > > On Wed, Jul 27, 2022 at 5:36 PM yidan zhao wrote: > > > > > 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢? > > > > > > yidan zhao 于2022年7月27日周三 17:34写道: > > >

Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-27 文章 yidan zhao
我是本地直接ide内run。 Weihua Hu 于2022年7月27日周三 22:10写道: > > Hi, 你是怎么提交的任务呢?是提交到远端的 session cluster 上吗?有其他的相关日志吗? > > Best, > Weihua > > > On Wed, Jul 27, 2022 at 5:36 PM yidan zhao wrote: > > > 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢? > >

Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-27 文章 Weihua Hu
Hi, 你是怎么提交的任务呢?是提交到远端的 session cluster 上吗?有其他的相关日志吗? Best, Weihua On Wed, Jul 27, 2022 at 5:36 PM yidan zhao wrote: > 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢? > > yidan zhao 于2022年7月27日周三 17:34写道: > > > > 我将这3个jar放到pyflink的lib下则是可以的。通过 add_

Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-27 文章 yidan zhao
而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢? yidan zhao 于2022年7月27日周三 17:34写道: > > 我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。 > > yidan zhao 于2022年7月27日周三 10:40写道: > > > > pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。 > > 但 f

Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-27 文章 yidan zhao
我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。 yidan zhao 于2022年7月27日周三 10:40写道: > > pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。 > 但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar + > kafka-clients-2.8.1.jar 却报: > py4j.pro

Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-26 文章 yidan zhao
pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。 但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar + kafka-clients-2.8.1.jar 却报: py4j.protocol.Py4JError: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer does not exist in the JVM Weihua Hu 于2022年7月26日周二 21:21写道

Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-26 文章 Weihua Hu
最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer Best, Weihua On Tue, Jul 26, 2022 at 5:40 PM yidan zhao wrote: > 如题,我看注释和文档。 > add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢? >

pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-26 文章 yidan zhao
如题,我看注释和文档。 add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?

Re: 关于PyFlink的环境问题,期望得到回复。

2022-06-19 文章 LuNing Wang
Yuxia > > - 原始邮件 - > 发件人: "张 兴博" > 收件人: "user-zh" > 发送时间: 星期一, 2022年 6 月 20日 上午 8:54:21 > 主题: 关于PyFlink的环境问题,期望得到回复。 > > 您好: > > > 我是西南财经大学一个在读博士,目前想使用PyFlink,但是遇到了一个问题,我的集群采用的CDH6.2.0,其中的Hadoop为3.0.0版本。当我在pytlink

Re: 关于PyFlink的环境问题,期望得到回复。

2022-06-19 文章 LuNing Wong
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/yarn/#preparation 张 兴博 于2022年6月20日周一 09:36写道: > 您好: > > > 我是西南财经大学一个在读博士,目前想使用PyFlink,但是遇到了一个问题,我的集群采用的CDH6.2.0,其中的Hadoop为3.0.0版本。当我在pytlink的程序里引入hadoop-common-3.0.0.jar(或hadoop-common-3.

Re: 关于PyFlink的环境问题,期望得到回复。

2022-06-19 文章 yuxia
.jar Best regards, Yuxia - 原始邮件 - 发件人: "张 兴博" 收件人: "user-zh" 发送时间: 星期一, 2022年 6 月 20日 上午 8:54:21 主题: 关于PyFlink的环境问题,期望得到回复。 您好: 我是西南财经大学一个在读博士,目前想使用PyFlink,但是遇到了一个问题,我的集群采用的CDH6.2.0,其中的Hadoop为3.0.0版本。当我在pytlink的程序里引入hadoop-common-3.0.0.jar(或hadoop-common-3.0.0

关于PyFlink的环境问题,期望得到回复。

2022-06-19 文章 张 兴博
您好: 我是西南财经大学一个在读博士,目前想使用PyFlink,但是遇到了一个问题,我的集群采用的CDH6.2.0,其中的Hadoop为3.0.0版本。当我在pytlink的程序里引入hadoop-common-3.0.0.jar(或hadoop-common-3.0.0-cdh6.2.0.jar)的时候,运行程序就会报错: Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.security.UserGroupInformation

Re: 关于PyFlink的开发环境问题

2022-06-16 文章 Weihua Hu
Huang wrote: > Hi, > > 你可以执行 pip install -r flink-python/dev/dev-requirements.txt 安装开发环境所需要的依赖 > > Best, > Xingbo > > 张 兴博 于2022年6月15日周三 10:20写道: > > > 您好: > > 我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为: > > > > Traceback

Re: 关于PyFlink的开发环境问题

2022-06-15 文章 Xingbo Huang
Hi, 你可以执行 pip install -r flink-python/dev/dev-requirements.txt 安装开发环境所需要的依赖 Best, Xingbo 张 兴博 于2022年6月15日周三 10:20写道: > 您好: >我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为: > > Traceback (most recent call last): > File "/root/.py"

关于PyFlink的开发环境问题

2022-06-14 文章 张 兴博
您好: 我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为: Traceback (most recent call last): File "/root/.py", line 6, in s_env = StreamExecutionEnvironment.get_execution_environment() File "/usr/local/lib/python3.8/dist-packages/pyf

Re:Re: pyflink报错:Java gateway process exited before sending its port number

2022-05-23 文章 RS
Hi, 感谢,查询pyflink目录下,里面确实存在多个版本的jar包,我清理了下,可以运行起来了, 看来是PyCharm的bug了,安装新版本的时候没有成功清理旧的版本 Thanks~ 在 2022-05-23 19:27:42,"Dian Fu" 写道: >>> java.lang.NoSuchMethodError: >org.apache.flink.util.NetUtils.getAvailablePort()I > >你的环境是不是不太干净?可以检查一下 PyFlink 安装目录下(site-packag

Re: pyflink报错:Java gateway process exited before sending its port number

2022-05-23 文章 Dian Fu
>> java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getAvailablePort()I 你的环境是不是不太干净?可以检查一下 PyFlink 安装目录下(site-packages/pyflink/lib) 的那些 jar 包的版本。 On Mon, May 23, 2022 at 4:22 PM RS wrote: > Hi, > 在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码 > 参考官方文档: > https://ni

pyflink报错:Java gateway process exited before sending its port number

2022-05-23 文章 RS
Hi, 在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码 参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/ 报错如下: Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.NoSuchM

Re: Pyflink 1.14 维表 DDL报错

2022-02-06 文章 Caizhi Weng
PRIMARY KEY(currency) NOT ENFORCED ) WITH ( > 'connector' = 'kafka', 'value.format' = 'debezium-json',/* ... */ > );基于以上,我的疑惑共有几点:1.是维表不能使用‘datagen’数据来源吗?还是pyflink 1.14 > 目前不支持维表ddl?2.阿里云的维表ddl语句和官网的维表ddl语句有什么区别吗?他们的使用场景是什么?3.如何能够让我在练习的时候正确使用维表join请大家指点一下,感激不尽

Pyflink 1.14 ???? DDL????

2022-02-06 文章 ??????
MARK FOR update_time AS update_time, PRIMARY KEY(currency) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'value.format' = 'debezium-json',/* ... */ );1.datagen??pyflink 1.14 ??ddl??2.dd

?????? ??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-22 文章 Asahi Lee
2021 at 10:48 AM Asahi Lee <978466...@qq.com.invalidgt; wrote: gt; ??source my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE?? gt; jobmanagerNo module named pyflinkjobmanageryarn gt; ??

Re: 哪里可以下载到downloads/setup-pyflink-virtual-env.sh脚本

2021-11-21 文章 Dian Fu
Flink用的1.14.0,venv.zip中的PyFlink版本是多少? On Sun, Nov 21, 2021 at 7:59 PM Asahi Lee wrote: > Hi! > 我通过如下命令提交成功了,python的参数需求-D方式传入,-py方式传入不生效: > ./flink-1.14.0/bin/flink > run-application -t yarn-application > -Dyarn.provided.lib.dirs="hdfs://nameservice1

?????? ??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-21 文章 Asahi Lee
??source my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE?????? jobmanagerNo module named pyflinkjobmanageryarn ?? gt; LogType:jobmanager.out gt; Log Upload Time:?? ?? 18 20:48:45 +0800 2021 gt;

Re: 哪里可以下载到downloads/setup-pyflink-virtual-env.sh脚本

2021-11-18 文章 Dian Fu
刚注意到你用的YARN application模式,PyFlink 1.14.0才支持YARN application模式,主要是新增了命令行选项“ -pyclientexec” 和配置“python.client.executable”: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable 对于你这个作业来说,你需要通过使用1.14.0版本,同时添加命令行选项:-pyclientexec venv.zip

?????? ??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-18 文章 Asahi Lee
??source my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE?? jobmanagerNo module named pyflinkjobmanageryarn ?? LogType:jobmanager.out Log Upload Time:?? ?? 18 20:48:45 +0800 2021

Re: 哪里可以下载到downloads/setup-pyflink-virtual-env.sh脚本

2021-11-18 文章 Dian Fu
a Table api中使用python udf > 函数,通过下面的命令提交应用,报无法启动python服务错误,请问我的提交方式对吗?jm日志为/bin/python: No module named > pyflink。 > > > ./flink-1.13.2/bin/flink > run-application -t yarn-application > -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib" > -Dyarn.appl

????????????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-18 文章 Asahi Lee
Hi ! java Table api??python udf pythonjm??/bin/python: No module named pyflink?? ./flink-1.13.2/bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/fli

Re:??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-17 文章 zxyoung
Hi!??https://nightlies.apache.org/flink/flink-docs-release-1.12/downloads/setup-pyflink-virtual-env.sh ?? 2021-11-18 15:05:03??"Asahi Lee" <978466...@qq.com.INVALID> ?? >Hi! > >flink??setup-pyflink-virtua

??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-17 文章 Asahi Lee
Hi! flink??setup-pyflink-virtual-env.sh python?? https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/faq/

Re: flink 1.13.2 在 Java/Scala 程序中调用 Python UDF函数,通过yarn-application执行,yarn集群的每台机器都需要安装pyflink?

2021-11-09 文章 Dian Fu
FYI On Wed, Nov 10, 2021 at 9:31 AM Dian Fu wrote: > 也可以通过以下方式: > - Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定 > - Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定 > > > 但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速

Re: flink 1.13.2 在 Java/Scala 程序中调用 Python UDF函数,通过yarn-application执行,yarn集群的每台机器都需要安装pyflink?

2021-11-09 文章 Dian Fu
也可以通过以下方式: - Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定 - Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定 但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速度。 [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python

flink 1.13.2 ?? Java/Scala ?????????? Python UDF??????????yarn-application??????yarn????????????????????????pyflink?

2021-11-08 文章 Asahi Lee
HI! ??flink 1.13.2??java table apipython udf??yarn-applicationyarn??pyflink?

Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码

2021-10-20 文章 Dian Fu
图挂了,邮件列表不能直接发图片。可以发一下更详细的日志信息吗? On Tue, Oct 19, 2021 at 6:34 PM xuzh wrote: > 错误日志 > Exception in thread Thread-14: > Traceback (most recent call last): > File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in > _bootstrap_inner > self.run() > File >

?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
Exception in thread Thread-14: Traceback (most recent call last): File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in _bootstrap_inner self.run() File "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py", line 218, in run while not

?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
---- ??: "user-zh"

?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
??udfudfjar?? -- -- ??:

?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
??udfudfjar?? ---- ??:

Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码

2021-10-18 文章 Dian Fu
我试了一下是可以运行的,可以发一下报错吗? On Mon, Oct 18, 2021 at 6:44 PM xuzh wrote: > from pyflink.table import ScalarFunction, EnvironmentSettings, > TableEnvironment, DataTypes > from pyflink.table.udf import udf > from pyflink.table.expressions import call, row > > > class HashCode(ScalarFunction): > def

pyflink 1.14.0 udf ??????????????????????????

2021-10-18 文章 xuzh
from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes from pyflink.table.udf import udf from pyflink.table.expressions import call, row class HashCode(ScalarFunction): def __init__(self): self.factor = 12 def eval(self, s): return

pyflink安装好后启动flink cluster失败

2021-09-04 文章 casel.chen
ight", "credits" or "license" for more information. >>> import pyflink >>> pyflink.__file__ '/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyflink/__init__.py' >>> exit() $ cd /Library/Frameworks/Pytho

Re: Pyflink中使用ConnectedStream时,key_by后分区的问题

2021-07-13 文章 Fei Zhao
1]).process(MyFunction()) > results.print() > > env.execute("test_job") > > if __name__ == "__main__": > main() > > > Dian Fu 于2021年7月12日周一 下午4:48写道: > >> Hi, >> >> 是否发一下可复现的完整示例? >> >> Regards, >> D

Re: Pyflink中使用ConnectedStream时,key_by后分区的问题

2021-07-12 文章 赵飞
t_job") if __name__ == "__main__": main() -- On 2021/07/12 08:47:59, Dian Fu wrote: > Hi, > > 是否发一下可复现的完整示例? > > Regards, > Dian > > > 2021年7月10日 下午7:44,赵飞 写道: > > > > 各位好,请教一个问题。 > > > > 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据

Re: Pyflink中使用ConnectedStream时,key_by后分区的问题

2021-07-12 文章 赵飞
2021年7月12日周一 下午4:48写道: > Hi, > > 是否发一下可复现的完整示例? > > Regards, > Dian > > > 2021年7月10日 下午7:44,赵飞 写道: > > > > 各位好,请教一个问题。 > > > > > 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码

Re: Pyflink中使用ConnectedStream时,key_by后分区的问题

2021-07-12 文章 Dian Fu
Hi, 是否发一下可复现的完整示例? Regards, Dian > 2021年7月10日 下午7:44,赵飞 写道: > > 各位好,请教一个问题。 > > 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下: > > --- > results = data.connect(rules).key_by('product_id', > 'prod

Pyflink中使用ConnectedStream时,key_by后分区的问题

2021-07-10 文章 赵飞
各位好,请教一个问题。 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下: --- results = data.connect(rules).key_by('product_id', 'product_id').process(MyFunction()) results.print() class MyFunction(KeyedCoProcessFunction): def open(self, ctx

Re: pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 文章 Dian Fu
要用fat jar: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.1/flink-sql-connector-kafka_2.11-1.13.1.jar > 2021年6月2日 下午2:43,qianhuan <819687...@qq.com> 写道: > > 版本: > python 3.8 > apache-flink 1.13.1 > apache-flink-libraries 1.13.1 > > 代码: >

Re: pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 文章 qianhuan
是不是connector版本问题,之前1.12.2可以跑,有没有大神帮忙看下 -- Sent from: http://apache-flink.147419.n8.nabble.com/

pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 文章 qianhuan
版本: python 3.8 apache-flink 1.13.1 apache-flink-libraries 1.13.1 代码: from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings def log_processing(): env = StreamExecutionEnvironment.get_execution_environment()

?????? Pyflink jdbc????

2021-06-01 文章 ????
https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q ????pyflink??jdbc??jar??jdbc??flink??1.13.1 gt; gt; gt; gt; from pyflink.datastream import StreamExecutionEnvironment gt; from pyflink.table import StreamTableE

Re: Pyflink jdbc相关

2021-06-01 文章 Dian Fu
gt; 发送时间:2021年6月1日(星期二) 下午5:30 > 收件人:"user-zh" > 主题:回复: Pyflink jdbc相关 > > > > > > 感谢,我换成2.11确实可以了 > > > -- 原始邮件 -- > 发件人:

?????? Pyflink jdbc????

2021-06-01 文章 ????
; ?? ---- ??: "" <1129656...@qq.com; :2021??6??1??(??) 5:30 ??:"user-zh"https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q pyflink

?????? Pyflink jdbc????

2021-06-01 文章 ????
er-zh" https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q ????pyflink??jdbc??jar??jdbc??flink??1.13.1 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableE

Re: Pyflink jdbc相关

2021-06-01 文章 Dian Fu
Hi, 本地执行: 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的 flink run: 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。 > 2021年6月1日 下午4:33,琴师 <1129656...@qq.com> 写道: > > Hi, > 我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q >

Pyflink jdbc????

2021-06-01 文章 ????
Hi?? ??https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q pyflink??jdbc??jar??jdbc??flink??1.13.1 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment

Re: pyflink双流join

2021-05-16 文章 qianhuan
非常感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink双流join

2021-05-16 文章 Dian Fu
用Table API的话,可以看一下这个: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#joins> 另外,也可以直接在PyFlink中调用SQL语句: https://ci.apache.org/projects/flink/flink-docs-releas

pyflink双流join

2021-05-16 文章 qianhuan
想实现pyflink双流join,没有找到相关示例,有没有大神指导下用pyflink是否能实现? -- Sent from: http://apache-flink.147419.n8.nabble.com/

PyFlink调查问卷

2021-04-20 文章 Dian Fu
Hi all, 【填问卷,抽奖送T恤!】它来了它来了 ~ 为了更好地服务 PyFlink 用户,帮助 PyFlink 用户将 PyFlink 应用到生产环境中,Apache Flink 中文社区接下来计划推出一系列 PyFlink 的相关的文档及参考资料,让 PyFlink 用户得到更多优质的 PyFlink 学习资料! 为此我们推出这个调查问卷,了解大家感兴趣的内容,希望大家积极参与这个问卷,帮助我们更好的去整理 PyFlink 相关学习资料~ PS:填完问卷后即可参与抽奖,Flink 定制款 Polo 衫送送送!4月30日中午12:00准时开奖哦 ~ https

Re: pyflink kafka connector报错

2021-04-19 文章 qianhuan
非常感谢,已解决,sql写错了。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink kafka connector报错

2021-04-19 文章 Dian Fu
把flink-connector-kafka_2.11-1.12.2.jar删了 > 2021年4月19日 下午6:08,qianhuan <819687...@qq.com> 写道: > > 感谢回复 > 导入了flink-sql-connector-kafka_2.11-1.12.2.jar和flink-connector-kafka_2.11-1.12.2.jar,并且放到了site-packages/pyflink/lib目录下,还是一样的报错。 > test_source_table_1这个ka

Re: pyflink kafka connector报错

2021-04-19 文章 qianhuan
感谢回复 导入了flink-sql-connector-kafka_2.11-1.12.2.jar和flink-connector-kafka_2.11-1.12.2.jar,并且放到了site-packages/pyflink/lib目录下,还是一样的报错。 test_source_table_1这个kafka的表应该是创建成功了,是查询的问题吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink kafka connector报错

2021-04-19 文章 Dian Fu
'topic' = 'test1', > 'properties.bootstrap.servers' = 'localhost:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' >) >""" > pyflink版本: > apache-flink 1.12.2 > > 导

pyflink kafka connector报错

2021-04-19 文章 qianhuan
'localhost:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """ pyflink版本: apache-flink 1.12.2 导入的jar包:flink-connector-kafka_2.11-1.12.2.jar python执行报错信息: py4j.protocol.Py4JJavaError: An error occu

Re: Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-15 文章 magichuang
感谢~ 通过多次调试 是打的venv 包有问题, 已经解决了 现在可以在集群上跑了谢谢~ > -- 原始邮件 -- > 发 件 人:"Dian Fu" > 发送时间:2021-04-15 10:32:49 > 收 件 人:user-zh ,magichu...@88.com > 抄 送: > 主 题:Re: Re: pyflin

Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 文章 Dian Fu
命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 > -ytm 1024m -p 1 -py traffic.py > > > > > > -- 原始邮件 -- > > 发 件 人:"Dian Fu" > > 发送时间:2021-04-14 23:11:57 > > 收 件 人:user-zh > > 抄 送: > > 主

Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 文章 magichuang
-- > 发 件 人:"Dian Fu" > 发送时间:2021-04-14 23:11:57 > 收 件 人:user-zh > 抄 送: > 主 题:Re: pyflink 运行提示:Function class 'class > org.apache.flink.table.functions.python.PythonScalarFunction' is not > serializable > > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮

Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 文章 Dian Fu
ink on > yarn,per-job模式 > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 > > > > > 主要代码 > > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', > '128m') > > t_env.get_config().get_configuration().

pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 文章 magichuang
flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on yarn,per-job模式 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 主要代码 t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', '128m') t_env.get_config

Re: pyflink资源优化问题,请教

2021-04-05 文章 Dian Fu
处理逻辑看起来应该是没有问题的。 1)可以详细说一下,你说的数据延迟问题吗?现在的qps可以达到多少,预期是多少? 2)你现在用的哪种部署模式? 3)并发度的设置可以参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/table_environment.html#configuration

回复:pyflink资源优化问题,请教

2021-04-05 文章 郭华威
hidden email 在2021年04月06日 11:36,苗红宾 写道: 你好: 业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。 现在的使用方式: 1、slide_window = Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w") 2、使用sql语句注册kafka

pyflink资源优化问题,请教

2021-04-05 文章 苗红宾
你好: 业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。 现在的使用方式: 1、slide_window = Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w") 2、使用sql语句注册kafka connector, 3、result table使用普通的print: CREATE

Re: pyflink UDTF求助!

2021-03-18 文章 陈康
感谢回复! -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink UDTF求助!

2021-03-18 文章 Xingbo Huang
Hi, 经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1] 来记录这个问题了。目前的workaroud方案是使用Table API。 具体可以参考下面的代码: >>> a = t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith ,

Re: pyflink UDTF求助!

2021-03-17 文章 陈康
apache-flink 1.11.1 -- Sent from: http://apache-flink.147419.n8.nabble.com/

pyflink UDTF求助!

2021-03-17 文章 陈康
quot;, line 280, in t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.

Re: Re: pyflink使用的一些疑问

2021-03-17 文章 Xingbo Huang
Hi, 其实pyflink作业就两种,一种是用了python udf的,一种是没用python udf 1. 对于没用python udf的作业,你写的所有python代码就是api层调用,只负责在客户端编译作业。你可以认为实际运行的作业代码全都是java的同一套代码,都是在JVM里面跑的,也就不存在性能差别,如果你觉得哪个操作性能不行,那就得去分析java对应算子的性能问题。 2. 对于用了python udf的作业,因为你写的udf函数内容是python代码,这种代码在运行时JVM不认识,你需要有PVM执行这种代码,所以会起python进程专门执行udf里面的内容,所以涉及到IPC通

Re:Re: pyflink使用的一些疑问

2021-03-16 文章 xiaoyue
Hi, Xingbo 想跟您了解一下关于sql_query执行上的细节,flink1.12版本底层执行sql语句的过程中,是否有谓词下退的优化? 从相关的代码测试结果看: 1. pyflink1.11版本的connector定义支持参数read.query来获取数据,执行效率很高,猜测这部分执行交由数据库完成; 2. pyflink1.12版本取消了read.query参数,当定义多个数据源执行join等操作时,耗时很明显(pyflink) 所以,基于上述这种情况,想跟您请教一下这部分耗时,也是因为python的语言缺陷,或者ipc开销?还是底层的实

Re: Pyflink 提交到本地集群报错

2021-03-16 文章 Huilin_WU
你好,谢谢你的回复,现在更新到V1.12就可以直接运行了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink使用的一些疑问

2021-03-16 文章 Xingbo Huang
-aggregate-functions Best, Xingbo xiaoyue 于2021年3月16日周二 上午11:42写道: > 您好, > 目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。 > pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet; > 不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率; > 目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf

Re: pyflink使用的一些疑问

2021-03-15 文章 xiaoyue
您好, 目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。 pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet; 不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率; 目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多; 个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率

Re: Pyflink dataset没有支持相关map reduce函数

2021-03-15 文章 Dian Fu
Hi, 有几个疑问: 1)你说的map reduce函数具体指的什么?可以举一个例子吗? 2)DataSet API指的是Java的DataSet API吗?另外,Java的DataSet API会逐步废弃,统一到DataStream API上来,所以PyFlink里不会支持DataSet API,只支持Python Table API和Python DataStream API > 2021年3月13日 上午10:54,nova.he 写道: > > 你好, > > 最近项目想使用flink进行分布式计算,之前项目是Pytho

Re: 关于pyflink LATERAL TABLE 问题请教

2021-03-15 文章 陈康
简单提供了下 可复现的例子,请帮忙看看~谢谢! -- Sent from: http://apache-flink.147419.n8.nabble.com/

pyflink使用的一些疑问

2021-03-14 文章 qian he
你好, 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map reduce函数,所以有以下疑问: 1.Python flink的SDK还没支持dataset吗? 2.是不是有其他替代方法? 3.如果还没支持,有计划支持的时间吗? 4.flink table为啥不支持map reduce操作? 5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map reduce操作,对

pyflink中 datastream没有实现windows方法

2021-03-13 文章 heqianfly

Re: 关于pyflink LATERAL TABLE 问题请教

2021-03-13 文章 陈康
apache-flink 1.11.1 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于pyflink LATERAL TABLE 问题请教

2021-03-13 文章 陈康
### ## 可执行 ### t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta) predict FROM source """).insert_into("print_table") t_env.execute('pyflink UDTF') -- Sent from: http://apache-flink.147419.n8.nabble.com/

Pyflink dataset没有支持相关map reduce函数

2021-03-13 文章 nova.he
你好, 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,没有相关map reduce函数,所以有以下疑问: 1.Python flink的SDK还没支持dataset吗? 2.是不是有其他替代方法? 3.如果还没支持,有计划支持的时间吗? 4.flink table为啥不支持map reduce操作? 5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map reduce操作,对应

Re: Pyflink 提交到本地集群报错

2021-03-12 文章 Dian Fu
从报错看,似乎是作业运行的时候,找不到pyflink,如果确实是这样的话,有几个解决方案: - 通过API指定集群端的Python路径: set_python_executable,参考 [1] - 通过配置python.executable,参考[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html [2] https://ci.apache.org/projects/flink

Re: Pyflink如何对接HBase?

2021-03-12 文章 Dian Fu
1)在PyFlink Table API中可以使用所有SQL中支持的connector,所以HBase connector也自然支持,具体使用方式可以看一下文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#how-to-use-connectors 2)HBase connector的使用方式可以看一下: https://ci.apache.org/projects/flink

  1   2   3   4   5   >