Re: Install/Run Streaming Anomaly Detection R package in Flink

2021-02-23 Thread Wei Zhong
Hi Robert,

If you do not want to install the library on every machine of the cluster, the 
Python dependency management API can be used to upload and use the required 
dependencies to cluster. 

For this case, I recommend building a portable python environment that contains 
all the required dependencies. You can call `add_python_archives` to upload the 
environment to your and call `set_python_executable` to set the path of the 
python interpreter in your cluster.

For more detailed information, you can refer to the following link.

Documentation of the Python dependency management API and configuration:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program>
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives>

How to build a portable python environment:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/faq.html#preparing-python-virtual-environment
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/faq.html#preparing-python-virtual-environment>

Best,
Wei

> 在 2021年2月24日,01:38,Roman Khachatryan  写道:
> 
> Hi,
> 
> I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.
> 
> Regards,
> Roman
> 
> 
> On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen  <mailto:cinquate...@gmail.com>> wrote:
> My customer wants us to install this package in our Flink Cluster:
> 
> https://github.com/twitter/AnomalyDetection 
> <https://github.com/twitter/AnomalyDetection>
> 
> One of our engineers developed a python version:
> 
> https://pypi.org/project/streaming-anomaly-detection/ 
> <https://pypi.org/project/streaming-anomaly-detection/>
> 
> Is there a way to install this in our cluster?
> 
> -- 
> Robert Cullen
> 240-475-4490



Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?

2021-02-05 Thread Wei Zhong
尝试调用:
get_gateway().jvm.Test2.Test2.main(None)

> 在 2021年2月5日,18:27,瞿叶奇 <389243...@qq.com> 写道:
> 
> 老师,您好,列表参数就不在报错,但是还是没有加载进去。
> >>> from pyflink.util.utils import add_jars_to_context_class_loader
> >>> add_jars_to_context_class_loader(['file:///root/Test2.jar 
> >>> ']) 
> >>> from pyflink.java_gateway import get_gateway
> >>> get_gateway().jvm.Test2.main()
> Traceback (most recent call last):
>   File "", line 1, in 
>   File 
> "/root/qyq_f/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
> line 191, in wrapped_call
> TypeError: Could not found the Java class 'Test2.main'. The Java dependencies 
> could be specified via command line argument '--jarfile' or the config option 
> 'pipeline.jars'
> java code:
> package Test2;
> public class Test2 {
> public int add(int a, int b) {
> return a + b;
> }
> 
> public static void main(String[] args) {
> int a = 1;
> int b = 2;
> Test2 t2=new Test2();
> int c=t2.add(a,b);
> System.out.print(c);
> }
> }
> 
> 
> -- 原始邮件 --
> 发件人:  "user-zh" mailto:weizhong0...@gmail.com>>;
> 发送时间: 2021年2月5日(星期五) 晚上6:01
> 收件人: "user-zh"mailto:user-zh@flink.apache.org>>;
> 主题: Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> 
> 图片似乎无法加载,不过我猜应该是参数类型问题?这个函数需要参数类型为List:
> add_jars_to_context_class_loader(["file:///xxx   >"])
> 
> > 在 2021年2月5日,17:48,瞿叶奇 <389243...@qq.com> 写道:
> > 
> > 老师,您好,
> > 我升级到了flink1.12.0了,用这个函数加载类报错了,是我url写的有问题吗?pyfink有没有连接hdfs认证kerberos的方法呢?
> > 
> > 
> > 
> > 
> > -- 原始邮件 --
> > 发件人: "user-zh" ;
> > 发送时间: 2021年2月5日(星期五) 下午3:53
> > 收件人: "user-zh";
> > 主题: Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> > 
> > Hi,
> > 
> > 首先你需要将你的java程序打成jar包,之后热加载你的jar包, 目前pyflink里有一个util方法可以直接调用:
> > from pyflink.util.utils import add_jars_to_context_class_loader
> > add_jars_to_context_class_loader("file:///xxx ") # 
> > 注意需要是url格式的路径
> > 
> > 然后就能通过java gateway进行调用了:
> > from pyflink.java_gateway import get_gateway
> > get_gateway().jvm.your.class.name.main()
> > 
> > 注意这个util方法是在较新的版本里才有,你可能需要升级一下pyflink版本
> > 
> > > 在 2021年2月5日,10:48,瞿叶奇 <389243...@qq.com> 写道:
> > > 
> > > 老师,我想用java jvm认证kerberos程序,供给pyflink使用,就是说如果是函数的话,就只是个调用的无输入函数,怎么样才能做到?
> > > 
> > > 
> > > 
> > > 
> > > --原始邮件--
> > > 发件人:  
> > >   "user-zh"   
> > >  
> > > mailto:hxbks...@gmail.com>;
> > > 发送时间:2021年2月5日(星期五) 上午10:35
> > > 收件人:"user-zh" > > ;
> > > 
> > > 主题:Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> > > 
> > > 
> > > 
> > > Hi,
> > > 
> > > 你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1]
> > > 
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> > >  
> > > 
> > > 
> > > Best,
> > > Xingbo
> > > 
> > > 
> > > 瞿叶奇 <389243...@qq.com  于2021年2月4日周四 下午5:53写道:
> > > 
> > >  请问如何实现pyflink的py4j调用我自己写的java程序 ?
> > 
> > 



Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?

2021-02-05 Thread Wei Zhong
图片似乎无法加载,不过我猜应该是参数类型问题?这个函数需要参数类型为List:
add_jars_to_context_class_loader(["file:///xxx "])

> 在 2021年2月5日,17:48,瞿叶奇 <389243...@qq.com> 写道:
> 
> 老师,您好,
> 我升级到了flink1.12.0了,用这个函数加载类报错了,是我url写的有问题吗?pyfink有没有连接hdfs认证kerberos的方法呢?
> 
> 
> 
> 
> -- 原始邮件 --
> 发件人: "user-zh" ;
> 发送时间: 2021年2月5日(星期五) 下午3:53
> 收件人: "user-zh";
> 主题: Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> 
> Hi,
> 
> 首先你需要将你的java程序打成jar包,之后热加载你的jar包, 目前pyflink里有一个util方法可以直接调用:
> from pyflink.util.utils import add_jars_to_context_class_loader
> add_jars_to_context_class_loader("file:///xxx ") # 注意需要是url格式的路径
> 
> 然后就能通过java gateway进行调用了:
> from pyflink.java_gateway import get_gateway
> get_gateway().jvm.your.class.name.main()
> 
> 注意这个util方法是在较新的版本里才有,你可能需要升级一下pyflink版本
> 
> > 在 2021年2月5日,10:48,瞿叶奇 <389243...@qq.com> 写道:
> > 
> > 老师,我想用java jvm认证kerberos程序,供给pyflink使用,就是说如果是函数的话,就只是个调用的无输入函数,怎么样才能做到?
> > 
> > 
> > 
> > 
> > --原始邮件--
> > 发件人:
> > "user-zh"   
> >  
> >  > 发送时间:2021年2月5日(星期五) 上午10:35
> > 收件人:"user-zh" > 
> > 主题:Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> > 
> > 
> > 
> > Hi,
> > 
> > 你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1]
> > 
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> > 
> > Best,
> > Xingbo
> > 
> > 
> > 瞿叶奇 <389243...@qq.com 于2021年2月4日周四 下午5:53写道:
> > 
> >  请问如何实现pyflink的py4j调用我自己写的java程序 ?
> 
> 



Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?

2021-02-05 Thread Wei Zhong
Hi,

首先你需要将你的java程序打成jar包,之后热加载你的jar包, 目前pyflink里有一个util方法可以直接调用:
from pyflink.util.utils import add_jars_to_context_class_loader
add_jars_to_context_class_loader("file:///xxx ") # 注意需要是url格式的路径

然后就能通过java gateway进行调用了:
from pyflink.java_gateway import get_gateway
get_gateway().jvm.your.class.name.main()

注意这个util方法是在较新的版本里才有,你可能需要升级一下pyflink版本

> 在 2021年2月5日,10:48,瞿叶奇 <389243...@qq.com> 写道:
> 
> 老师,我想用java jvm认证kerberos程序,供给pyflink使用,就是说如果是函数的话,就只是个调用的无输入函数,怎么样才能做到?
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年2月5日(星期五) 上午10:35
> 收件人:"user-zh" 
> 主题:Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> 
> 
> 
> Hi,
> 
> 你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1]
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> 
> Best,
> Xingbo
> 
> 
> 瞿叶奇 <389243...@qq.com 于2021年2月4日周四 下午5:53写道:
> 
>  请问如何实现pyflink的py4j调用我自己写的java程序 ?



Re: pyflink连接kerberos的kafka问题

2021-02-02 Thread Wei Zhong
Hi,

第一个问题应该是通过你现在的配置找不到对应的KDC realm, 可以继续尝试使用System.setProperty手动配置, 例如
System.setProperty("java.security.krb5.realm", "");
System.setProperty("java.security.krb5.kdc","”);

第二个问题, 
'update-mode’=‘append'指的是只接受来自上游算子的append消息,而不是写文件时采用append模式。我想你可能想要配置的属性是'format.write-mode’='OVERWRITE’?


> 在 2021年2月2日,21:17,瞿叶奇 <389243...@qq.com> 写道:
> 
> 老师,您好!
> 非常感谢您上次的解答,jaas.conf配置不在报错了,但是出现了新的问题,程序如下:
> #!/usr/bin/python3.7
> # -*- coding: UTF-8 -*-
> from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, 
> CsvTableSink, WriteMode, SqlDialect
> from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> from pyflink.java_gateway import get_gateway
> System = get_gateway().jvm.System
> System.setProperty("java.security.auth.login.config", 
> "/opt/client2/Flink/flink/conf/jaas.conf")
> System.setProperty("java.security.krb5.conf", "/root/qyq_user/krb5.conf ")
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
> s_env.enable_checkpointing(3)
> st_env = StreamTableEnvironment.create(s_env, TableConfig())
> st_env.use_catalog("default_catalog")
> st_env.use_database("default_database")
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
>  
> "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol",
>  'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 
> 'kafka').property("kerberos.domain.name", 
> 'hadoop.hadoop.com').property('group.id' 
> ,'example-group1').property("bootstrap.servers", 
> "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
>  DataTypes.BIGINT()),DataTypes.FIELD("name", 
> DataTypes.STRING())]))).with_schema(Schema().field("id", 
> DataTypes.BIGINT()).field("name", 
> DataTypes.STRING())).create_temporary_table("sourceKafka")
> fieldNames = ["id", "name"]
> fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, 
> WriteMode.OVERWRITE)
> st_env.register_table_sink("csvTableSink", csvSink)
> resultQuery = st_env.sql_query("select id,name from sourceKafka")
> resultQuery = resultQuery.insert_into("csvTableSink")
> st_env.execute("pyflink-kafka-v4")
> 新问题1):
> 
> 我百度了一下,也没有什么类似能够解决的案例。
> 新问题2):
> 创建hdfs的sink表后只能写一次就报错了,(文件已存在的错误)错误如下:
> <619c3...@682a4270.ce501960.jpg>
> 建表如下:
> CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'= 
> 'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode'
>  = 'append')
> 我想问一下,我需要改动,才能保证新数据追加到文件呢?
> 
> 



Re: 请问pyflink如何跟kerberos认证的kafka对接呢

2021-01-31 Thread Wei Zhong
Hi,

看你之前发的邮件,你现在是把kerberos相关的配置放在某一个flink-conf.yaml里,然后启动了一个local模式吧?

但是local模式的pyflink 
shell是不会主动读取任何flink-conf.yaml的。需要配置环境变量FLINK_HOME,将相关配置写入$FLINK_HOME/conf/flink-conf.yaml里,并且只有在提交job时候(flink
 run、remote模式或者yarn模式)才会去读取flink-conf.yaml里的内容。

如果执意要在local模式下尝试,可以通过以下代码:
from pyflink.java_gateway import get_gateway
System = get_gateway().jvm.System
拿到java中的System对象,然后按照java中的方式进行配置。

> 在 2021年1月30日,13:58,瞿叶奇 <389243...@qq.com> 写道:
> 
> 附测试程序,希望老师给出解决方法。测试时发现只更新csv文件的修改日期,但没有实际数据录入。怀疑kafka连接还存在问题。
> #!/usr/bin/python3.7
> # -*- coding: UTF-8 -*-
> from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, 
> CsvTableSink, WriteMode, SqlDialect
> from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
> s_env.enable_checkpointing(3000)
> st_env = StreamTableEnvironment.create(s_env, TableConfig())
> st_env.use_catalog("default_catalog")
> st_env.use_database("default_database")
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
>  
> "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol",
>  'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 
> 'kafka').property("kerberos.domain.name", 
> 'hadoop.hadoop.com').property("bootstrap.servers", 
> "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
>  DataTypes.BIGINT()),DataTypes.FIELD("name", 
> DataTypes.STRING())]))).with_schema(Schema().field("id", 
> DataTypes.BIGINT()).field("name", 
> DataTypes.STRING())).register_table_source("sourceKafka")
> fieldNames = ["id", "name"]
> fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, 
> WriteMode.OVERWRITE)
> st_env.register_table_sink("csvTableSink", csvSink)
> resultQuery = st_env.sql_query("select id,name from sourceKafka")
> resultQuery = resultQuery.insert_into("csvTableSink")
> st_env.execute("pyflink-kafka-v2")



Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-19 Thread Wei Zhong
Thanks Xintong for the great work!

Best,
Wei

> 在 2021年1月19日,18:00,Guowei Ma  写道:
> 
> Thanks Xintong's effort!
> Best,
> Guowei
> 
> 
> On Tue, Jan 19, 2021 at 5:37 PM Yangze Guo  > wrote:
> Thanks Xintong for the great work!
> 
> Best,
> Yangze Guo
> 
> On Tue, Jan 19, 2021 at 4:47 PM Till Rohrmann  > wrote:
> >
> > Thanks a lot for driving this release Xintong. This was indeed a release 
> > with some obstacles to overcome and you did it very well!
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 19, 2021 at 5:59 AM Xingbo Huang  > > wrote:
> >>
> >> Thanks Xintong for the great work!
> >>
> >> Best,
> >> Xingbo
> >>
> >> Peter Huang  >> > 于2021年1月19日周二 下午12:51写道:
> >>
> >> > Thanks for the great effort to make this happen. It paves us from using
> >> > 1.12 soon.
> >> >
> >> > Best Regards
> >> > Peter Huang
> >> >
> >> > On Mon, Jan 18, 2021 at 8:16 PM Yang Wang  >> > > wrote:
> >> >
> >> > > Thanks Xintong for the great work as our release manager!
> >> > >
> >> > >
> >> > > Best,
> >> > > Yang
> >> > >
> >> > > Xintong Song mailto:xts...@apache.org>> 
> >> > > 于2021年1月19日周二 上午11:53写道:
> >> > >
> >> > >> The Apache Flink community is very happy to announce the release of
> >> > >> Apache Flink 1.12.1, which is the first bugfix release for the Apache
> >> > Flink
> >> > >> 1.12 series.
> >> > >>
> >> > >> Apache Flink® is an open-source stream processing framework for
> >> > >> distributed, high-performing, always-available, and accurate data
> >> > streaming
> >> > >> applications.
> >> > >>
> >> > >> The release is available for download at:
> >> > >> https://flink.apache.org/downloads.html 
> >> > >> 
> >> > >>
> >> > >> Please check out the release blog post for an overview of the
> >> > >> improvements for this bugfix release:
> >> > >> https://flink.apache.org/news/2021/01/19/release-1.12.1.html 
> >> > >> 
> >> > >>
> >> > >> The full release notes are available in Jira:
> >> > >>
> >> > >>
> >> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
> >> >  
> >> > 
> >> > >>
> >> > >> We would like to thank all contributors of the Apache Flink community
> >> > who
> >> > >> made this release possible!
> >> > >>
> >> > >> Regards,
> >> > >> Xintong
> >> > >>
> >> > >
> >> >



Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-19 Thread Wei Zhong
Thanks Xintong for the great work!

Best,
Wei

> 在 2021年1月19日,18:00,Guowei Ma  写道:
> 
> Thanks Xintong's effort!
> Best,
> Guowei
> 
> 
> On Tue, Jan 19, 2021 at 5:37 PM Yangze Guo  > wrote:
> Thanks Xintong for the great work!
> 
> Best,
> Yangze Guo
> 
> On Tue, Jan 19, 2021 at 4:47 PM Till Rohrmann  > wrote:
> >
> > Thanks a lot for driving this release Xintong. This was indeed a release 
> > with some obstacles to overcome and you did it very well!
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 19, 2021 at 5:59 AM Xingbo Huang  > > wrote:
> >>
> >> Thanks Xintong for the great work!
> >>
> >> Best,
> >> Xingbo
> >>
> >> Peter Huang  >> > 于2021年1月19日周二 下午12:51写道:
> >>
> >> > Thanks for the great effort to make this happen. It paves us from using
> >> > 1.12 soon.
> >> >
> >> > Best Regards
> >> > Peter Huang
> >> >
> >> > On Mon, Jan 18, 2021 at 8:16 PM Yang Wang  >> > > wrote:
> >> >
> >> > > Thanks Xintong for the great work as our release manager!
> >> > >
> >> > >
> >> > > Best,
> >> > > Yang
> >> > >
> >> > > Xintong Song mailto:xts...@apache.org>> 
> >> > > 于2021年1月19日周二 上午11:53写道:
> >> > >
> >> > >> The Apache Flink community is very happy to announce the release of
> >> > >> Apache Flink 1.12.1, which is the first bugfix release for the Apache
> >> > Flink
> >> > >> 1.12 series.
> >> > >>
> >> > >> Apache Flink® is an open-source stream processing framework for
> >> > >> distributed, high-performing, always-available, and accurate data
> >> > streaming
> >> > >> applications.
> >> > >>
> >> > >> The release is available for download at:
> >> > >> https://flink.apache.org/downloads.html 
> >> > >> 
> >> > >>
> >> > >> Please check out the release blog post for an overview of the
> >> > >> improvements for this bugfix release:
> >> > >> https://flink.apache.org/news/2021/01/19/release-1.12.1.html 
> >> > >> 
> >> > >>
> >> > >> The full release notes are available in Jira:
> >> > >>
> >> > >>
> >> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
> >> >  
> >> > 
> >> > >>
> >> > >> We would like to thank all contributors of the Apache Flink community
> >> > who
> >> > >> made this release possible!
> >> > >>
> >> > >> Regards,
> >> > >> Xintong
> >> > >>
> >> > >
> >> >



Re: PyFlink on Yarn, Per-Job模式,如何增加多个外部依赖jar包?

2021-01-05 Thread Wei Zhong
Hi Zhizhao,

能检查一下'file://' 后面跟的是绝对路径吗?这个报错是因为对应的路径在本地磁盘上找不到导致的。

> 在 2021年1月6日,10:23,Zhizhao Shangguan  写道:
> 
> Hi:
>   PyFlink on Yarn, 
> Per-Job模式,如何增加多个外部依赖jar包?比如flink-sql-connector-kafka、flink-connector-jdbc等。
>  
>  环境信息
>  Flink 版本:1.11.0
>  Os: mac
>  
>  尝试了如下方案,遇到了一些问题
> 1、  按照官网cli说明[1]:-j 可以指定jar包,但只能添加一个,后面在加-j不生效。
> 2、 按照依赖管理说明[2]:使用pipeline.jars,会报找不到文件的错误
> 配置信息
> t_env.get_config().get_configuration().set_string("pipeline.jars", 
> "file:///path/flink-sql-connector-kafka_2.11-1.11.0.jar;file:///path/flink-connector-jdbc_2.11-1.11.0.jar;file:///path/mysql-connector-java-5.1.38.jar
>  
> ")
>  
> 启动命令
> # flink run -m yarn-cluster -pyarch venv.zip -pyexec venv.zip/venv/bin/Python 
> -py StreamingKafkaToMysql.py
>  
> 错误信息
> 
>  
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html 
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html
>  
> 


Re: pyflink 1.12 是不支持 通过sql 直接向数据库获取数据的操作么? 没看到相关接口

2020-12-22 Thread Wei Zhong
你好,

pyflink需要通过声明jdbc connector的方式来从数据库中获取数据。

> 在 2020年12月22日,17:40,肖越 <18242988...@163.com> 写道:
> 
> 例如:pandas.read_sql()的用法,直接返回源数据,pyflink小白,蹲大佬的答复。



Re: pyflink1.12 进行多表关联后的结果类型是TableResult,如何转为Table类型

2020-12-22 Thread Wei Zhong
你好,

使用env.sql_update()执行select语句可以获得Table类型的结果。

> 在 2020年12月22日,13:25,肖越 <18242988...@163.com> 写道:
> 
> 通过sql进行左连接查询,sql语句为:
> sql = ''' Insert into print_sink select a.id, a.pf_id, b.symbol_id from  a \
> left join b on b.day_id = a.biz_date where a.ccy_type = 'AC' and \
> a.pf_id = '1030100122' and b.symbol_id = '2030004042' and a.biz_date 
> between '20160701' and '20170307' '''
> 
> 
> table_result = env.execute_sql(sql)
> 通过env.execute_sql()执行后的结果是 TableResult , 如何转成Table类型?
> 或者有哪些其他的方式,可以直接执行表的连接操作,返回结果是Table类型?
> 



Re: pyflink1.12 连接Mysql报错 : Missing required options

2020-12-21 Thread Wei Zhong
Hi,

正如报错中提示的,with参数里需要的是"url"参数,你可以尝试将connector.url改成url试试看会不会报错了。

> 在 2020年12月21日,13:44,肖越 <18242988...@163.com> 写道:
> 
> 在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
> #DDL定义
> source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\
> 
>symbol_id VARCHAR,biz_date VARCHAR,\
> 
>ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\
> 
>is_valid DECIMAL,time_mark TIMESTAMP) WITH (
> 
>'connector' = 'jdbc',
> 
>'connector.url' = 'jdbc:mysql://ip:port/db_base',
> 
>'connector.table' = 'ts_pf_sec_yldrate',
> 
>'table-name' = 'ts_pf_sec_yldrate',
> 
>'connector.driver' = 'com.mysql.jdbc.Driver',
> 
>'connector.username' = 'xxx',
> 
>'connector.password' = 'xxx')
> 
> """
> 错误信息:
> Traceback (most recent call last):
>  File 
> "C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py",
>  line 67, in 
>print(join.to_pandas().head(6))
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
>  line 807, in to_pandas
>.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 147, in deco
>return f(*a, **kw)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>  line 328, in get_return_value
>format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
> : org.apache.flink.table.api.ValidationException: Unable to create a source 
> for reading table 'default_catalog.default_database.ts_pf_sec_yldrate'.
> 
> 
> Table options are:
> 
> 
> 'connector'='jdbc'
> 'connector.driver'='com.mysql.jdbc.Driver'
> 'connector.password'='xxx'
> 'connector.table'='ts_pf_sec_yldrate'
> 'connector.url'='jdbc:mysql://ip:port/db_base'
> 'connector.username'='xxx'
> 'table-name'='ts_pf_sec_yldrate'
> at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
> at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
> at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
> at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
> at 
> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
> at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
> at 
> org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
> at 
> 

Re: Urgent help on S3 CSV file reader DataStream Job

2020-12-16 Thread Wei Zhong
Hi Deep,

You can try to change the `FileProcessingMode.PROCESS_ONCE` to 
`FileProcessingMode.PROCESS_CONTINUOUSLY`.

Best,
Wei

> 在 2020年12月15日,20:18,DEEP NARAYAN Singh  写道:
> 
> Hi Wei,
> Could you please suggest , how to fix this below issues.
> 
> Thanks & Regards,
> Deep
> 
> On Mon, 14 Dec, 2020, 10:28 AM DEEP NARAYAN Singh,  <mailto:about.d...@gmail.com>> wrote:
> Hi Wei,
> No problem at all.Thanks for your response.
> Yes ,it is just starting from the beginning like no check pointing finished.
> 
> Thanks,
> -Deep
> 
> On Mon, 14 Dec, 2020, 8:01 AM Wei Zhong,  <mailto:weizhong0...@gmail.com>> wrote:
> Hi Deep,
> 
> Sorry for the late reply. Could you provide more specific information about 
> the problem? e.g. did the job skip the file that was being processed during 
> the last checkpointing, or did it start from the beginning just like no 
> checkpointing finished?
> 
> Best,
> Wei
> 
>> 在 2020年12月12日,13:14,DEEP NARAYAN Singh > <mailto:about.d...@gmail.com>> 写道:
>> 
>> Hi Wei,
>> I'm sorry to bother you ,could you please help me in clarifying my doubt 
>> which have mentioned in previous email?
>> 
>> Thank you in advance.
>> 
>> Regards,
>> -Deep
>> 
>> On Fri, 11 Dec, 2020, 2:16 PM DEEP NARAYAN Singh, > <mailto:about.d...@gmail.com>> wrote:
>> Hi Wei,
>> Just I want to clarify my doubt about check pointing as part of s3 
>> datastream source . Let say my job started with a current resource and it 
>> got failed in between because of some lack of resource (e.g  Heap space 
>> Exception etc.), In that case what I observed was that if the job is auto 
>> restart by using restart strategy , it was not processing the data from the 
>> last checkpointing .
>> 
>> Could you please help me in how to handle this case as part of s3 data 
>> source.
>> 
>> Thanks,
>> -Deep
>> 
>> On Tue, Dec 8, 2020 at 10:22 PM DEEP NARAYAN Singh > <mailto:about.d...@gmail.com>> wrote:
>> Hi Wei,
>> Thanks you for the clarification. I have implemented the suggest approach 
>> and it is working fine now.
>> 
>> Thanks,
>> -Deep
>> 
>> On Tue, 8 Dec, 2020, 5:24 PM Wei Zhong, > <mailto:weizhong0...@gmail.com>> wrote:
>> Hi Deep,
>> 
>> It seems that the TypeInformation array in your code has 2 elements, but we 
>> only need one here. This approach treats the entire csv file as a Row which 
>> has only a one column, so there should be only one 
>> `BasicTypeInfo.STRING_TYPE_INFO` in the array. And if you use the 
>> TextInputFormat instead of the RowCsvInputFormat, this problem can also be 
>> solved.
>> 
>> If you have created your own InputFormat via extending the 
>> RowCsvInputFormat, you can get the current file path via 
>> `this.currentSplit.getPath()` in your class. Note that if you choose to fill 
>> the file path into the second column of the Row, you do not need to make the 
>> above changes, because at this time we really need the TypeInformation array 
>> to contain two StringTypeInfo elements.
>> 
>> Best,
>> Wei
>> 
>> 
>>> 在 2020年12月8日,19:29,DEEP NARAYAN Singh >> <mailto:about.d...@gmail.com>> 写道:
>>> 
>>> Hi Wei, 
>>> 
>>> Also I need to know how to get file names  along with single Row data as 
>>> part of Datastream during runtime.So that I can  extract some of the data 
>>> from the file name in the next operator to construct the final json string.
>>> 
>>> Thanks,
>>> -Deep
>>> 
>>> On Tue, Dec 8, 2020 at 4:10 PM DEEP NARAYAN Singh >> <mailto:about.d...@gmail.com>> wrote:
>>> Hi Wei, 
>>> 
>>> Please find the below code snippet:
>>> TypeInformation[] typeInformation = new 
>>> TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, 
>>> BasicTypeInfo.STRING_TYPE_INFO};
>>> RowCsvInputFormat csvInputFormat = new RowCsvInputFormat(new 
>>> org.apache.flink.core.fs.Path(directory), typeInformation);
>>> csvInputFormat.setDelimiter((char) 0);
>>> csvInputFormat.setFieldDelimiter(String.valueOf((char) 0));
>>> csvInputFormat.setNestedFileEnumeration(true);
>>> csvInputFormat.setMinSplitSize(10);
>>> return environment
>>> .readFile(csvInputFormat, directory, 
>>> FileProcessingMode.PROCESS_ONCE, -1, 
>>> S3Service.createCustomFilter(finalParameters))
>>> .name("Source: Custom File Reader for path " + 
>>> directory).s

Re: Pandas UDF处理过的数据sink问题

2020-12-13 Thread Wei Zhong
Hi Lucas,

是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。

你可以尝试将sql语句改成以下形式:

select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount

此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”

Best,
Wei

> 在 2020年12月13日,13:13,Lucas  写道:
> 
> 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> 
> @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> result_type=DataTypes.ROW(
> [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
>  DataTypes.FIELD('aveBuy', DataTypes.INT())),
> func_type='pandas')
> def orderCalc(code, amount):
> 
>df = pd.DataFrame({'code': code, 'amount': amount})
> # pandas 数据处理后输入另一个dataframe output
> return (output['buyQtl'], output['aveBuy'])
> 
> 
> 定义了csv的sink如下
> 
> create table csvSink (
>buyQtl BIGINT,
>aveBuy INT 
> ) with (
>'connector.type' = 'filesystem',
>'format.type' = 'csv',
>'connector.path' = 'e:/output'
> )
> 
> 
> 
> 然后进行如下的操作:
> 
> result_table = t_env.sql_query("""
> select orderCalc(code, amount)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> """)
> result_table.execute_insert("csvSink")
> 
> 
> 
> 在执行程序的时候提示没法入库
> 
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o98.executeInsert.
> 
> : org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table
> 'default_catalog.default_database.csvSink' do not match.
> 
> Cause: Different number of columns.
> 
> 
> 
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> 
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> 
>at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> ception(DynamicSinkUtils.java:304)
> 
>at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> ImplicitCast(DynamicSinkUtils.java:134)
> 
> 
> 
> 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> 



Re: [ANNOUNCE] Apache Flink 1.12.0 released

2020-12-10 Thread Wei Zhong
Congratulations! Thanks Dian and Robert for the great work!

Best,
Wei

> 在 2020年12月10日,20:26,Leonard Xu  写道:
> 
> 
> Thanks Dian and Robert for the great work as release manager ! 
> And thanks everyone who makes the release possible ! 
> 
> 
> Best,
> Leonard
> 
>> 在 2020年12月10日,20:17,Robert Metzger  写道:
>> 
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink 1.12.0, which is the latest major release.
>> 
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>> 
>> Please check out the release blog post for an overview of the improvements
>> for this bugfix release:
>> https://flink.apache.org/news/2020/12/10/release-1.12.0.html
>> 
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348263
>> 
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>> 
>> Regards,
>> Dian & Robert
> 



Re: Urgent help on S3 CSV file reader DataStream Job

2020-12-08 Thread Wei Zhong
Hi Deep,

It seems that the TypeInformation array in your code has 2 elements, but we 
only need one here. This approach treats the entire csv file as a Row which has 
only a one column, so there should be only one `BasicTypeInfo.STRING_TYPE_INFO` 
in the array. And if you use the TextInputFormat instead of the 
RowCsvInputFormat, this problem can also be solved.

If you have created your own InputFormat via extending the RowCsvInputFormat, 
you can get the current file path via `this.currentSplit.getPath()` in your 
class. Note that if you choose to fill the file path into the second column of 
the Row, you do not need to make the above changes, because at this time we 
really need the TypeInformation array to contain two StringTypeInfo elements.

Best,
Wei


> 在 2020年12月8日,19:29,DEEP NARAYAN Singh  写道:
> 
> Hi Wei, 
> 
> Also I need to know how to get file names  along with single Row data as part 
> of Datastream during runtime.So that I can  extract some of the data from the 
> file name in the next operator to construct the final json string.
> 
> Thanks,
> -Deep
> 
> On Tue, Dec 8, 2020 at 4:10 PM DEEP NARAYAN Singh  <mailto:about.d...@gmail.com>> wrote:
> Hi Wei, 
> 
> Please find the below code snippet:
> TypeInformation[] typeInformation = new 
> TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO};
> RowCsvInputFormat csvInputFormat = new RowCsvInputFormat(new 
> org.apache.flink.core.fs.Path(directory), typeInformation);
> csvInputFormat.setDelimiter((char) 0);
> csvInputFormat.setFieldDelimiter(String.valueOf((char) 0));
> csvInputFormat.setNestedFileEnumeration(true);
> csvInputFormat.setMinSplitSize(10);
> return environment
> .readFile(csvInputFormat, directory, FileProcessingMode.PROCESS_ONCE, 
> -1, S3Service.createCustomFilter(finalParameters))
> .name("Source: Custom File Reader for path " + 
> directory).setParallelism(readerParallelism);
> 
> But after that,I have created my own custom RowCsvInputFormat and enabled the 
> csvInputFormat.setLenient(true) and modified the class a little bit then it 
> worked.
> 
> // check valid start position
> if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 
> 1)) {
>if (isLenient()) {
>   return true;
>} else {
>   throw new ParseException("Row too short: " + new String(bytes, offset, 
> numBytes, getCharset()));
>}
> }
> Let me know if you need any details.
> Thanks,
> -Deep
> 
> 
> 
> 
> 
> On Tue, Dec 8, 2020 at 8:13 AM Wei Zhong  <mailto:weizhong0...@gmail.com>> wrote:
> Hi Deep,
> 
> Could you show your current code snippet? I have tried the Csv file data on 
> my local machine and it works fine, so I guess what might be wrong elsewhere.
> 
> Best,
> Wei
> 
> 
> > 在 2020年12月8日,03:20,DEEP NARAYAN Singh  > <mailto:about.d...@gmail.com>> 写道:
> > 
> > Hi Wei and Till,
> > Thanks for the quick reply.
> > 
> > @Wei, I tried with code which you have suggested and it is working fine but 
> > I have one use case where it is failing, below is the csv input data format 
> > :
> > Csv file data format   :
> > ---
> > field_id,data,
> > A,1
> > B,3
> > C,4
> > D,9
> > E,0,0,0,0
> > 
> > because of last row which contains more that two value, and its is throwing 
> > org.apache.flink.api.common.io.ParseException: Row too short: field_id,data,
> > 
> > How to handle the above corner case.Could you please suggest some way to 
> > handle this.
> > 
> > @Till, Could you please elaborate more which you are suggesting? As per my 
> > use case I am dealing with multiple csv files under the given folder and 
> > reading line by line using TextInputFormat  and transform will not work by 
> > using map operator. Correct me if i'm wrong .
> > 
> > Thanks & Regards,
> > -Deep
> > 
> > 
> > On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann  > <mailto:trohrm...@apache.org>> wrote:
> > Hi Deep,
> > 
> > Could you use the TextInputFormat which reads a file line by line? That way
> > you can do the JSON parsing as part of a mapper which consumes the file
> > lines.
> > 
> > Cheers,
> > Till
> > 
> > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong  > <mailto:weizhong0...@gmail.com>> wrote:
> > 
> > > Hi Deep,
> > >
> > > (redirecting this to user mailing list as this is not a dev question)
> > >
> > > You can try to set the line delimiter and field delimiter of t

Re: Urgent help on S3 CSV file reader DataStream Job

2020-12-07 Thread Wei Zhong
Hi Deep,

Could you show your current code snippet? I have tried the Csv file data on my 
local machine and it works fine, so I guess what might be wrong elsewhere.

Best,
Wei


> 在 2020年12月8日,03:20,DEEP NARAYAN Singh  写道:
> 
> Hi Wei and Till,
> Thanks for the quick reply.
> 
> @Wei, I tried with code which you have suggested and it is working fine but I 
> have one use case where it is failing, below is the csv input data format :
> Csv file data format   :
> ---
> field_id,data,
> A,1
> B,3
> C,4
> D,9
> E,0,0,0,0
> 
> because of last row which contains more that two value, and its is throwing 
> org.apache.flink.api.common.io.ParseException: Row too short: field_id,data,
> 
> How to handle the above corner case.Could you please suggest some way to 
> handle this.
> 
> @Till, Could you please elaborate more which you are suggesting? As per my 
> use case I am dealing with multiple csv files under the given folder and 
> reading line by line using TextInputFormat  and transform will not work by 
> using map operator. Correct me if i'm wrong .
> 
> Thanks & Regards,
> -Deep
> 
> 
> On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann  wrote:
> Hi Deep,
> 
> Could you use the TextInputFormat which reads a file line by line? That way
> you can do the JSON parsing as part of a mapper which consumes the file
> lines.
> 
> Cheers,
> Till
> 
> On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong  wrote:
> 
> > Hi Deep,
> >
> > (redirecting this to user mailing list as this is not a dev question)
> >
> > You can try to set the line delimiter and field delimiter of the
> > RowCsvInputFormat to a non-printing character (assume there is no 
> > non-printing
> > characters in the csv files). It will read all the content of a csv file
> > into one Row. e.g.
> >
> > final StreamExecutionEnvironment env =
> >StreamExecutionEnvironment.getExecutionEnvironment();
> > String path = "test";
> > TypeInformation[] fieldTypes = new TypeInformation[]{
> >BasicTypeInfo.STRING_TYPE_INFO};
> > RowCsvInputFormat csvFormat =
> >new RowCsvInputFormat(new Path(path), fieldTypes);
> > csvFormat.setNestedFileEnumeration(true);
> > csvFormat.setDelimiter((char) 0);
> > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> > DataStream
> >lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> >-1);lines.map(value -> value).print();
> > env.execute();
> >
> >
> > Then you can convert the content of the csv files to json manually.
> >
> > Best,
> > Wei
> >
> >
> > 在 2020年12月7日,19:10,DEEP NARAYAN Singh  写道:
> >
> > Hi  Guys,
> >
> > Below is my code snippet , which read all csv files under the given folder
> > row by row but my requirement is to read csv file at a time and  convert as
> > json which will looks like :
> > {"A":"1","B":"3","C":"4","D":9}
> >
> > Csv file data format   :
> > ---
> > *field_id,data,*
> >
> >
> >
> > *A,1B,3C,4D,9*
> >
> > Code snippet:
> > --
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();String path =
> > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> > fieldTypes = new TypeInformation[]{  BasicTypeInfo.STRING_TYPE_INFO,
> >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =  new
> > RowCsvInputFormat(new Path(path),
> >
> > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream
> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > -1);lines.map(value -> value).print();*
> >
> >
> > Any help is highly appreciated.
> >
> > Thanks,
> > -Deep
> >
> >
> >



Re: Urgent help on S3 CSV file reader DataStream Job

2020-12-07 Thread Wei Zhong
Hi Deep,

(redirecting this to user mailing list as this is not a dev question)

You can try to set the line delimiter and field delimiter of the 
RowCsvInputFormat to a non-printing character (assume there is no non-printing 
characters in the csv files). It will read all the content of a csv file into 
one Row. e.g.

final StreamExecutionEnvironment env =
   StreamExecutionEnvironment.getExecutionEnvironment();
String path = "test";
TypeInformation[] fieldTypes = new TypeInformation[]{
   BasicTypeInfo.STRING_TYPE_INFO};
RowCsvInputFormat csvFormat = 
   new RowCsvInputFormat(new Path(path), fieldTypes);
csvFormat.setNestedFileEnumeration(true);
csvFormat.setDelimiter((char) 0);
csvFormat.setFieldDelimiter(String.valueOf((char) 0));
DataStream
   lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
   -1);lines.map(value -> value).print();
env.execute();

Then you can convert the content of the csv files to json manually.

Best,
Wei


> 在 2020年12月7日,19:10,DEEP NARAYAN Singh  写道:
> 
> Hi  Guys,
> 
> Below is my code snippet , which read all csv files under the given folder
> row by row but my requirement is to read csv file at a time and  convert as
> json which will looks like :
> {"A":"1","B":"3","C":"4","D":9}
> 
> Csv file data format   :
> ---
> *field_id,data,*
> 
> 
> 
> *A,1B,3C,4D,9*
> 
> Code snippet:
> --
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> *final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();String path =
> "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> fieldTypes = new TypeInformation[]{  BasicTypeInfo.STRING_TYPE_INFO,
>  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =  new
> RowCsvInputFormat(new Path(path),
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream
> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> -1);lines.map(value -> value).print();*
> 
> 
> Any help is highly appreciated.
> 
> Thanks,
> -Deep



Re: SQL解析复杂JSON问题

2020-12-04 Thread Wei Zhong
是的,1.11想做JSON的自定义解析和映射只能在json format以外的地方进行了

> 在 2020年12月4日,17:19,李轲  写道:
> 
> 如果1.11想做自定义解析和映射,只能通过udf么?
> 
> 发自我的iPhone
> 
>> 在 2020年12月4日,16:52,Wei Zhong  写道:
>> 
>> Hi 你好,
>> 
>> 这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json 
>> schema不是完全相同的话,需要手动写json-schema:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format>
>> 
>> 
>>> 在 2020年12月4日,16:39,guaishushu1...@163.com 写道:
>>> 
>>> 麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log(
>>>  id VARCHAR,
>>>  timestam VARCHAR,
>>>  user_info ROW(user_id string, name string ),
>>>  jsonArray ARRAY
>>> ) WITH (
>>>  'connector.type' = 'kafka',
>>>  'connector.version' = 'universal',
>>>  'connector.topic' = 'complex_string',
>>>  'connector.properties.zookeeper.connect' = 'venn:2181',
>>>  'connector.properties.bootstrap.servers' = 'venn:9092',
>>>  'connector.startup-mode' = 'earliest-offset',
>>>  'format.type' = 'json',
>>>  'format.json-schema' = '{
>>>  "type": "object",
>>>  "properties": {
>>> "id": {type: "string"},
>>> "timestam": {type: "string"},
>>> "user_info":{type: "object",
>>> "properties" : {
>>> "user_id" : {type:"string"},
>>> "name":{type:"string"}
>>> }
>>>   },
>>>  "jsonArray":{"type": "array",
>>>   "items": {
>>>"type": "object",
>>>"properties" : {
>>>"user_id222" : {type:"string"},
>>>"name222" : {type:"string"}
>>>   }
>>>}
>>>   }
>>>  }
>>>  }'
>>> );
>>> 
>>> 
>>> 
>>> 
>>> guaishushu1...@163.com
>> 
>> 
> 



Re: SQL解析复杂JSON问题

2020-12-04 Thread Wei Zhong
Hi 你好,

这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json 
schema不是完全相同的话,需要手动写json-schema:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping
 

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format
 



> 在 2020年12月4日,16:39,guaishushu1...@163.com 写道:
> 
> 麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log(
>id VARCHAR,
>timestam VARCHAR,
>user_info ROW(user_id string, name string ),
>jsonArray ARRAY
> ) WITH (
>'connector.type' = 'kafka',
>'connector.version' = 'universal',
>'connector.topic' = 'complex_string',
>'connector.properties.zookeeper.connect' = 'venn:2181',
>'connector.properties.bootstrap.servers' = 'venn:9092',
>'connector.startup-mode' = 'earliest-offset',
>'format.type' = 'json',
>'format.json-schema' = '{
>"type": "object",
>"properties": {
>   "id": {type: "string"},
>   "timestam": {type: "string"},
>   "user_info":{type: "object",
>   "properties" : {
>   "user_id" : {type:"string"},
>   "name":{type:"string"}
>   }
> },
>"jsonArray":{"type": "array",
> "items": {
>  "type": "object",
>  "properties" : {
>  "user_id222" : {type:"string"},
>  "name222" : {type:"string"}
> }
>  }
> }
>}
>}'
> );
> 
> 
> 
> 
> guaishushu1...@163.com



Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 Thread Wei Zhong
这个错误信息显示问题在后续版本已经修复,新版本发布后升级版本就能直接从错误信息中看到是哪些TableFactory冲突了:

https://issues.apache.org/jira/browse/FLINK-20186 
<https://issues.apache.org/jira/browse/FLINK-20186>



> 在 2020年12月3日,20:08,Wei Zhong  写道:
> 
> Hi,
> 
> 现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:
> 
> List result = new LinkedList<>();
> ServiceLoader
>.load(Factory.class, Thread.currentThread().getContextClassLoader())
>.iterator()
>.forEachRemaining(result::add);
> List jdbcResult = result.stream().filter(f ->
>DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
>f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
> System.out.println(jdbcResult);
> 
> 
>> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com 
>> <mailto:18868816...@163.com>> 写道:
>> 
>> Hi,
>> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
>> 的 Connector?
>> 
>> 
>> Best,
>> Hailong
>> 在 2020-12-03 14:44:18,"xuzh" mailto:huazhe...@qq.com>> 写道:
>>> 错误:
>>> 
>>> 
>>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>>> factories for identifier 'jdbc' that implement 
>>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>>> classpath
>>> 
>>> 
>>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>>> 
>>> 
>>> 代码如下:
>>> package org.apache.flink.examples;
>>> 
>>> 
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>>> 
>>> 
>>> public class CDC2ss2 {
>>>   public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> // set up execution environment
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> StreamTableEnvironment tEnv;
>>> 
>>> 
>>> EnvironmentSettings settings = 
>>> EnvironmentSettings.newInstance()
>>> .useBlinkPlanner()
>>> .inStreamingMode()
>>> .build();
>>> tEnv = StreamTableEnvironment.create(env, 
>>> settings);
>>> String src_sql = "CREATE TABLE userss (\n" +
>>> "  
>>> user_id INT,\n" +
>>> "  
>>> user_nm STRING\n" +
>>> ") WITH (\n" +
>>> "  
>>>  'connector' = 'mysql-cdc',\n" +
>>> "  
>>>  'hostname' = '10.12.5.37',\n" +
>>> "  
>>>  'port' = '3306',\n" +
>>> "  
>>>  'username' = 'dps',\n" +
>>> "  
>>>  'password' = 'dps1234',\n" +
>>> "  
>>>  'database-name' = 'rpt',\n" +
>>> "  
>>>  'table-name' = 'users'\n" +
>>> "  
>>>  )";
>>> 
>>> 
>>> tEnv.executeSql(src_sql); // 创建表
>>> 
>>> 
>>> String sink="CREATE TABLE sink (\n" +
>>> "  
>>> user_id INT,\n" +
>>> "  
>>> user_nm STRING,\n" +
>>> "  
>>> primary key(user_id) NOT ENFORCED \n" +
>>> ") WITH (\n" +
>>> "  
>>>  'connector' = 'jdbc',\n" +
>>> "  
>>>  'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n 
>>> " +
>>> "  
>>>  'username' = 'dps',\n" +
>>> "  
>>>  'password' = 'dps1234',\n" +
>>> "  
>>>  'table-name' = 'sink'\n" +
>>> "  
>>>  )";
>>> String to_print_sql="insert into sink select 
>>> user_id ,user_nm from userss";
>>> tEnv.executeSql(sink);
>>> tEnv.executeSql(to_print_sql);
>>> env.execute();
>>>   }
>>> 
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 详细错误:
>>> 
>>> 
>>> Exception in thread &qu

Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 Thread Wei Zhong
Hi,

现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:

List result = new LinkedList<>();
ServiceLoader
   .load(Factory.class, Thread.currentThread().getContextClassLoader())
   .iterator()
   .forEachRemaining(result::add);
List jdbcResult = result.stream().filter(f ->
   DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
   f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
System.out.println(jdbcResult);


> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com> 写道:
> 
> Hi,
> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
> 的 Connector?
> 
> 
> Best,
> Hailong
> 在 2020-12-03 14:44:18,"xuzh"  写道:
>> 错误:
>> 
>> 
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>> factories for identifier 'jdbc' that implement 
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>> classpath
>> 
>> 
>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>> 
>> 
>> 代码如下:
>> package org.apache.flink.examples;
>> 
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>> 
>> 
>> public class CDC2ss2 {
>>   public static void main(String[] args) throws Exception {
>> 
>> 
>> // set up execution environment
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tEnv;
>> 
>> 
>> EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>> tEnv = StreamTableEnvironment.create(env, 
>> settings);
>> String src_sql = "CREATE TABLE userss (\n" +
>> "  
>> user_id INT,\n" +
>> "  
>> user_nm STRING\n" +
>> ") WITH (\n" +
>> "  
>>  'connector' = 'mysql-cdc',\n" +
>> "  
>>  'hostname' = '10.12.5.37',\n" +
>> "  
>>  'port' = '3306',\n" +
>> "  
>>  'username' = 'dps',\n" +
>> "  
>>  'password' = 'dps1234',\n" +
>> "  
>>  'database-name' = 'rpt',\n" +
>> "  
>>  'table-name' = 'users'\n" +
>> "  
>>  )";
>> 
>> 
>> tEnv.executeSql(src_sql); // 创建表
>> 
>> 
>> String sink="CREATE TABLE sink (\n" +
>> "  
>> user_id INT,\n" +
>> "  
>> user_nm STRING,\n" +
>> "  
>> primary key(user_id) NOT ENFORCED \n" +
>> ") WITH (\n" +
>> "  
>>  'connector' = 'jdbc',\n" +
>> "  
>>  'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>> "  
>>  'username' = 'dps',\n" +
>> "  
>>  'password' = 'dps1234',\n" +
>> "  
>>  'table-name' = 'sink'\n" +
>> "  
>>  )";
>> String to_print_sql="insert into sink select 
>> user_id ,user_nm from userss";
>> tEnv.executeSql(sink);
>> tEnv.executeSql(to_print_sql);
>> env.execute();
>>   }
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 详细错误:
>> 
>> 
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> Unable to create a sink for writing table 
>> 'default_catalog.default_database.sink'.
>> 
>> 
>> Table options are:
>> 
>> 
>> 'connector'='jdbc'
>> 'password'='dps1234'
>> 'table-name'='sink'
>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>> 'username'='dps'
>>  at 
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>>  at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>>  at 
>> 

Re: Flink CEP 动态加载 pattern

2020-12-02 Thread Wei Zhong
Hi 你好,

现在Flink CEP还不支持动态加载规则。社区现在有一个JIRA来跟踪这个需求:

https://issues.apache.org/jira/browse/FLINK-7129 


您可以关注这个JIRA来获取最新进展。

> 在 2020年12月2日,17:48,huang botao  写道:
> 
> Hi,在项目中常遇到规则变更的情况,我们一般怎么动态加载这些规则?Flink CEP有原生支持动态加载规则的API吗?



Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-18 Thread Wei Zhong
Hi Pierre,

Currently there is no type hint like ‘Map[String, Any]’. The recommended way is 
declaring your type more explicitly.

If you insist on doing this, you can try to declaring a RAW data type for 
java.util.HashMap [1], but you may encounter some troubles [2] related to the 
kryo serializers.

Best,
Wei

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw>
[2] 
https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
 
<https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class>


> 在 2020年11月19日,04:31,Pierre Oberholzer  写道:
> 
> Hi Wei,
> 
> It works ! Thanks a lot for your support.
> I hadn't tried this last combination for option 1, and I had wrong syntax for 
> option 2.
> 
> So to summarize..
> 
> Methods working:
> - Current: DataTypeHint in UDF definition + SQL for UDF registering
> - Outdated: override getResultType in UDF definition + 
> t_env.register_java_function for UDF registering
> 
> Type conversions working:
> - scala.collection.immutable.Map[String,String] => org.apache.flink.types.Row 
> => ROW
> - scala.collection.immutable.Map[String,String] => 
> java.util.Map[String,String] => MAP
> 
> Any hint for Map[String,Any] ?
> 
> Best regards,
> 
> Le mer. 18 nov. 2020 à 03:26, Wei Zhong  <mailto:weizhong0...@gmail.com>> a écrit :
> Hi Pierre,
> 
> Those 2 approaches all work in my local machine, this is my code:
> 
> Scala UDF:
> package com.dummy
> 
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.table.annotation.DataTypeHint
> import org.apache.flink.table.api.Types
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.types.Row
> 
> /**
>   * The scala UDF.
>   */
> class dummyMap extends ScalarFunction {
> 
>   // If the udf would be registered by the SQL statement, you need add this 
> typehint
>   @DataTypeHint("ROW")
>   def eval(): Row = {
> 
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> 
>   // If the udf would be registered by the method 'register_java_function', 
> you need override this
>   // method.
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = {
> // The type of the return values should be TypeInformation
> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
> Types.STRING()))
>   }
> }
> Python code:
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment
> 
> s_env = StreamExecutionEnvironment.get_execution_environment()
> st_env = StreamTableEnvironment.create(s_env)
> 
> # load the scala udf jar file, the path should be modified to yours
> # or your can also load the jar file via other approaches
> st_env.get_config().get_configuration().set_string("pipeline.jars", 
> "file:///Users/zhongwei/the-dummy-udf.jar <>")
> 
> # register the udf via 
> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE 
> SCALA")
> # or register via the method
> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
> 
> # prepare source and sink
> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
> 'c'])
> st_env.execute_sql("""create table mySink (
> output_of_my_scala_udf ROW
> ) with (
> 'connector' = 'print'
> )""")
> 
> # execute query
> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
> 
> Best,
> Wei
> 
>> 在 2020年11月18日,03:28,Pierre Oberholzer > <mailto:pierre.oberhol...@gmail.com>> 写道:
>> 
>> Hi Wei,
>> 
>> True, I'm using the method you mention, but glad to change. 
>> I tried your suggestion instead, but got a similar error.
>> 
>> Thanks for your support. That is much more tedious than I thought.
>> 
>> Option 1 - SQL UDF
>> 
>> SQL UDF
>> create_func_ddl = """
>> CREATE FUNCTION dummyMap 
>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>> """
>>   
>> t_env.execute_sql(create_func_ddl)
>> 
>> Error
>> Py4JJavaError: An error occurred while calling o672.execute.
>> : org.apache.flink.table.api.TableException: Result field does not match 
>> requ

Re: pyflink利用sql ddl连接hbase-1.4.x出错Configuring the input format (null) failed: Cannot create connection to HBase

2020-11-18 Thread Wei Zhong
Hi 你好,

看root cause是  io.netty.channel.EventLoopGroup 
这个类找不到,能否检查一下classpath里是否包含netty的jar包,亦或相关jar包中是否shade了netty库?

> 在 2020年11月16日,17:02,ghostviper  写道:
> 
> *环境配置如下:*
> hbase-1.4.13
> flink-1.11.1
> python-3.6.1
> pyflink-1.0
> 
> *已做配置如下:*
> 1.hadoop classpath下已经加入hbase路径 (:/opt/hbase/hbase-1.4.13/lib/*)
> 2.程序ddl配置如下:
> 
> source_ddl = """CREATE TABLE MySourceTable (
> hbase_rowkey_name varchar, cf1 ROW) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',  
>'connector.table-name' = 'flink-test',
>'connector.zookeeper.quorum' =
> 'es-zk-hadoop1:2181,es-zk-hadoop2:2181,es-zk-hadoop3:2181',
>'connector.zookeeper.znode.parent' = '/hbase')
> """
> 
> sink_ddl = """CREATE TABLE MySinkTable (
> hbase_rowkey_name varchar, cf1 ROW) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',  
>'connector.table-name' = 'flink-test-result',
>'connector.zookeeper.quorum' =
> 'es-zk-hadoop1:2181,es-zk-hadoop2:2181,es-zk-hadoop3:2181',
>'connector.zookeeper.znode.parent' = '/hbase')
> """
> 3.zookeeper无鉴权
> 4.hive能关联访问hbase
> 5.hbase shell命令能正确执行
> 6.hbase 集群状态正常
> 7.hbase lib目录下jar包如下:
> ./hbase-common-1.4.3.jar
> ./flink-connector-hbase_2.11-1.11.1.jar
> ./hbase-client-1.4.3.jar
> ./hbase-protocol-1.4.3.jar
> 
> 
> *出错信息如下:*
> Traceback (most recent call last):
>  File "read_hbase.py", line 46, in 
>st_env.execute("7-read_and_hbase")
>  File
> "/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/pyflink/table/table_environment.py",
> line 1057, in execute
>return JobExecutionResult(self._j_tenv.execute(job_name))
>  File
> "/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File
> "/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
>return f(*a, **kw)
>  File
> "/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
> : org.apache.flink.util.FlinkException: Failed to execute job
> '7-read_and_hbase'.
>   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1823)
>   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
>   at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>   at
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit job.
>   at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
>   at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>   at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>   at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 

Re: pyflink 1.11 运行pyflink作业时报错

2020-11-18 Thread Wei Zhong
Hi 你好,

只看目前的报错看不出问题来,请问能贴出出错部分的job源码吗?

> 在 2020年11月17日,16:58,whh_960101  写道:
> 
> Hi,各位大佬,pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table 
> insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in 
> from_kafka_to_oracle_demo
>
> main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result()
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", 
> line 783, in execute_insert
>return TableResult(self._j_table.executeInsert(table_path, overwrite))
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", 
> line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py",
>  line 154, in deco
>raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: 'Failed to execute sql'
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at 
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed
>  to execute sql 是什么原因
> 
> 
> 
> 
> 
> 
> 
> 
> 



Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

Those 2 approaches all work in my local machine, this is my code:

Scala UDF:
package com.dummy

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

/**
  * The scala UDF.
  */
class dummyMap extends ScalarFunction {

  // If the udf would be registered by the SQL statement, you need add this 
typehint
  @DataTypeHint("ROW")
  def eval(): Row = {

Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))

  }

  // If the udf would be registered by the method 'register_java_function', you 
need override this
  // method.
  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
// The type of the return values should be TypeInformation
Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
Types.STRING()))
  }
}
Python code:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)

# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///Users/zhongwei/the-dummy-udf.jar")

# register the udf via 
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE 
SCALA")
# or register via the method
# st_env.register_java_function("dummyMap", "com.dummy.dummyMap")

# prepare source and sink
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
'c'])
st_env.execute_sql("""create table mySink (
output_of_my_scala_udf ROW
) with (
'connector' = 'print'
)""")

# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()

Best,
Wei

> 在 2020年11月18日,03:28,Pierre Oberholzer  写道:
> 
> Hi Wei,
> 
> True, I'm using the method you mention, but glad to change. 
> I tried your suggestion instead, but got a similar error.
> 
> Thanks for your support. That is much more tedious than I thought.
> 
> Option 1 - SQL UDF
> 
> SQL UDF
> create_func_ddl = """
> CREATE FUNCTION dummyMap 
>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
> """
>   
> t_env.execute_sql(create_func_ddl)
> 
> Error
> Py4JJavaError: An error occurred while calling o672.execute.
> : org.apache.flink.table.api.TableException: Result field does not match 
> requested type. Requested: Row(s: String, t: String); Actual: 
> GenericType
> 
> Option 2 - Overriding getResultType
> 
> Back to the old registering method, but overriding getResultType:
> 
> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
> 
> Scala UDF
> class dummyMap() extends ScalarFunction {
> 
>   def eval(): Row = {
> 
>   Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> 
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> }
> 
> Error (on compilation)
> 
> [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:
> [error]   (x$1: 
> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>  
> [error]   ()org.apache.flink.table.types.DataType 
> [error]   (x$1: 
> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
> [error]  cannot be applied to (org.apache.flink.table.types.DataType, 
> org.apache.flink.table.types.DataType)
> [error]   override def getResultType(signature: Array[Class[_]]): 
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> [error]   
>^
> [error] one error found
> [error] (Compile / compileIncremental) Compilation failed
> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
> 
> Le mar. 17 nov. 2020 à 14:01, Wei Zhong  <mailto:weizhong0...@gmail.com>> a écrit :
> Hi Pierre,
> 
> I guess your UDF is registered by the method 'register_java_function' which 
> uses the old type system. In this situation you need to override the 
> 'getResultType' method instead of adding type hint. 
> 
> You can also try to register your UDF via the "CREATE FUNCTION" sql 
> statement, which accepts the type hint.
> 
> Be

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

I guess your UDF is registered by the method 'register_java_function' which 
uses the old type system. In this situation you need to override the 
'getResultType' method instead of adding type hint. 

You can also try to register your UDF via the "CREATE FUNCTION" sql statement, 
which accepts the type hint.

Best,
Wei

> 在 2020年11月17日,19:29,Pierre Oberholzer  写道:
> 
> Hi Wei,
> 
> Thanks for your suggestion. Same error.
> 
> Scala UDF
> 
> @FunctionHint(output = new DataTypeHint("ROW"))
> class dummyMap() extends ScalarFunction {
>   def eval(): Row = {
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>   }
> }
> 
> Best regards,
> 
> Le mar. 17 nov. 2020 à 10:04, Wei Zhong  <mailto:weizhong0...@gmail.com>> a écrit :
> Hi Pierre,
> 
> You can try to replace the '@DataTypeHint("ROW")' with 
> '@FunctionHint(output = new DataTypeHint("ROW”))'
> 
> Best,
> Wei
> 
>> 在 2020年11月17日,15:45,Pierre Oberholzer > <mailto:pierre.oberhol...@gmail.com>> 写道:
>> 
>> Hi Dian, Community,
>> 
>> (bringing the thread back to wider audience)
>> 
>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but 
>> also this simple case leads to a type mismatch between UDF and Table API.
>> I've also tried other Map objects from Flink (table.data.MapData, 
>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
>> (java.util.Map) in combination with DataTypeHint, without success.
>> N.B. I'm using version 1.11.
>> 
>> Am I doing something wrong or am I facing limitations in the toolkit ?
>> 
>> Thanks in advance for your support !
>> 
>> Best regards,
>> 
>> Scala UDF
>> 
>> class dummyMap() extends ScalarFunction {
>> 
>>  @DataTypeHint("ROW")
>>  def eval(): Row = {
>> 
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> 
>>   }
>> }
>> 
>> Table DDL
>> 
>> my_sink_ddl = f"""
>> create table mySink (
>> output_of_my_scala_udf ROW
>> ) with (
>> ...
>> )
>> """
>> 
>> Error
>> 
>> Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.ValidationException: Field types of query 
>> result and registered TableSink 
>> `default_catalog`.`default_database`.`mySink` do not match.
>> Query result schema: [output_of_my_scala_udf: 
>> GenericType]
>> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
>> 
>> 
>> 
>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer > <mailto:pierre.oberhol...@gmail.com>> a écrit :
>> Thanks Dian, but same error when using explicit returned type:
>> 
>> class dummyMap() extends ScalarFunction {
>> 
>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>> 
>> val states = Map("key1" -> "val1", "key2" -> "val2")
>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>> 
>>   }
>> }
>> 
>> Le ven. 13 nov. 2020 à 10:34, Dian Fu > <mailto:dian0511...@gmail.com>> a écrit :
>> You need to explicitly defined the result type the UDF. You could refer to 
>> [1] for more details if you are using Flink 1.11. If you are using other 
>> versions of Flink, you need to refer to the corresponding documentation.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide>
>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer >> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>> 
>>> ScalarFunction
>> 
>> 
>> 
>> -- 
>> Pierre
>> 
>> -- 
>> Pierre
> 
> 
> 
> -- 
> Pierre



Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

You can try to replace the '@DataTypeHint("ROW")' with 
'@FunctionHint(output = new DataTypeHint("ROW”))'

Best,
Wei

> 在 2020年11月17日,15:45,Pierre Oberholzer  写道:
> 
> Hi Dian, Community,
> 
> (bringing the thread back to wider audience)
> 
> As you suggested, I've tried to use DataTypeHint with Row instead of Map but 
> also this simple case leads to a type mismatch between UDF and Table API.
> I've also tried other Map objects from Flink (table.data.MapData, 
> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
> (java.util.Map) in combination with DataTypeHint, without success.
> N.B. I'm using version 1.11.
> 
> Am I doing something wrong or am I facing limitations in the toolkit ?
> 
> Thanks in advance for your support !
> 
> Best regards,
> 
> Scala UDF
> 
> class dummyMap() extends ScalarFunction {
> 
>  @DataTypeHint("ROW")
>  def eval(): Row = {
> 
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> }
> 
> Table DDL
> 
> my_sink_ddl = f"""
> create table mySink (
> output_of_my_scala_udf ROW
> ) with (
> ...
> )
> """
> 
> Error
> 
> Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.ValidationException: Field types of query result 
> and registered TableSink `default_catalog`.`default_database`.`mySink` do not 
> match.
> Query result schema: [output_of_my_scala_udf: 
> GenericType]
> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
> 
> 
> 
> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer  > a écrit :
> Thanks Dian, but same error when using explicit returned type:
> 
> class dummyMap() extends ScalarFunction {
> 
>   def eval() : util.Map[java.lang.String,java.lang.String] = {
> 
> val states = Map("key1" -> "val1", "key2" -> "val2")
> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
> 
>   }
> }
> 
> Le ven. 13 nov. 2020 à 10:34, Dian Fu  > a écrit :
> You need to explicitly defined the result type the UDF. You could refer to 
> [1] for more details if you are using Flink 1.11. If you are using other 
> versions of Flink, you need to refer to the corresponding documentation.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>  
> 
>> 在 2020年11月13日,下午4:56,Pierre Oberholzer > > 写道:
>> 
>> ScalarFunction
> 
> 
> 
> -- 
> Pierre
> 
> -- 
> Pierre



Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-28 Thread Wei Zhong
Congratulations Dian!

> 在 2020年8月28日,14:29,Jingsong Li  写道:
> 
> Congratulations , Dian!
> 
> Best, Jingsong
> 
> On Fri, Aug 28, 2020 at 11:06 AM Walter Peng  > wrote:
> congrats!
> 
> Yun Tang wrote:
> > Congratulations , Dian!
> 
> 
> -- 
> Best, Jingsong Lee



Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread Wei Zhong
r the FLIP.
> >>
> >> I think this will bring big benefits for the PyFlink users. Currently,
> >> the Python TableAPI document is hidden deeply under the TableAPI tab
> >> which makes it quite unreadable. Also, the PyFlink documentation is mixed
> >> with Java/Scala documentation. It is hard for users to have an overview of
> >> all the PyFlink documents. As more and more functionalities are added into
> >> PyFlink, I think it's time for us to refactor the document.
> >>
> >> Best,
> >> Hequn
> >>
> >>
> >> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira  >> <mailto:ma...@ververica.com>>
> >> wrote:
> >>
> >>> Hi, Jincheng!
> >>>
> >>> Thanks for creating this detailed FLIP, it will make a big difference in
> >>> the experience of Python developers using Flink. I'm interested in
> >>> contributing to this work, so I'll reach out to you offline!
> >>>
> >>> Also, thanks for sharing some information on the adoption of PyFlink,
> >>> it's
> >>> great to see that there are already production users.
> >>>
> >>> Marta
> >>>
> >>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  >>> <mailto:hxbks...@gmail.com>> wrote:
> >>>
> >>> > Hi Jincheng,
> >>> >
> >>> > Thanks a lot for bringing up this discussion and the proposal.
> >>> >
> >>> > Big +1 for improving the structure of PyFlink doc.
> >>> >
> >>> > It will be very friendly to give PyFlink users a unified entrance to
> >>> learn
> >>> > PyFlink documents.
> >>> >
> >>> > Best,
> >>> > Xingbo
> >>> >
> >>> > Dian Fu mailto:dian0511...@gmail.com>> 
> >>> > 于2020年7月31日周五 上午11:00写道:
> >>> >
> >>> >> Hi Jincheng,
> >>> >>
> >>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >>> >> improve the Python API doc.
> >>> >>
> >>> >> I have received many feedbacks from PyFlink beginners about
> >>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
> >>> mixed
> >>> >> with the Java doc and it's not easy to find the docs he wants to know.
> >>> >>
> >>> >> I think it would greatly improve the user experience if we can have
> >>> one
> >>> >> place which includes most knowledges PyFlink users should know.
> >>> >>
> >>> >> Regards,
> >>> >> Dian
> >>> >>
> >>> >> 在 2020年7月31日,上午10:14,jincheng sun  >>> >> <mailto:sunjincheng...@gmail.com>> 写道:
> >>> >>
> >>> >> Hi folks,
> >>> >>
> >>> >> Since the release of Flink 1.11, users of PyFlink have continued to
> >>> grow.
> >>> >> As far as I know there are many companies have used PyFlink for data
> >>> >> analysis, operation and maintenance monitoring business has been put
> >>> into
> >>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According
> >>> to
> >>> >> the feedback we received, current documentation is not very friendly
> >>> to
> >>> >> PyFlink users. There are two shortcomings:
> >>> >>
> >>> >> - Python related content is mixed in the Java/Scala documentation,
> >>> which
> >>> >> makes it difficult for users who only focus on PyFlink to read.
> >>> >> - There is already a "Python Table API" section in the Table API
> >>> document
> >>> >> to store PyFlink documents, but the number of articles is small and
> >>> the
> >>> >> content is fragmented. It is difficult for beginners to learn from it.
> >>> >>
> >>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >>> >> documents will be added for those new APIs. In order to increase the
> >>> >> readability and maintainability of the PyFlink document, Wei Zhong
> >>> and me
> >>> >> have discussed offline and would like to rework it via this FLIP.
> >>> >>
> >>> >> We will rework the document around the following three objectives:
> >>> >>
> >>> >> - Add a separate section for Python API under the "Application
> >>> >> Development" section.
> >>> >> - Restructure current Python documentation to a brand new structure to
> >>> >> ensure complete content and friendly to beginners.
> >>> >> - Improve the documents shared by Python/Java/Scala to make it more
> >>> >> friendly to Python users and without affecting Java/Scala users.
> >>> >>
> >>> >> More detail can be found in the FLIP-133:
> >>> >>
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
> >>>  
> >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation>
> >>> >>
> >>> >> Best,
> >>> >> Jincheng
> >>> >>
> >>> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg 
> >>> >> <https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg>
> >>> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g 
> >>> >> <https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g>
> >>> >>
> >>> >>
> >>> >>
> >>>
> >>



Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread Wei Zhong
r the FLIP.
> >>
> >> I think this will bring big benefits for the PyFlink users. Currently,
> >> the Python TableAPI document is hidden deeply under the TableAPI tab
> >> which makes it quite unreadable. Also, the PyFlink documentation is mixed
> >> with Java/Scala documentation. It is hard for users to have an overview of
> >> all the PyFlink documents. As more and more functionalities are added into
> >> PyFlink, I think it's time for us to refactor the document.
> >>
> >> Best,
> >> Hequn
> >>
> >>
> >> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira  >> <mailto:ma...@ververica.com>>
> >> wrote:
> >>
> >>> Hi, Jincheng!
> >>>
> >>> Thanks for creating this detailed FLIP, it will make a big difference in
> >>> the experience of Python developers using Flink. I'm interested in
> >>> contributing to this work, so I'll reach out to you offline!
> >>>
> >>> Also, thanks for sharing some information on the adoption of PyFlink,
> >>> it's
> >>> great to see that there are already production users.
> >>>
> >>> Marta
> >>>
> >>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  >>> <mailto:hxbks...@gmail.com>> wrote:
> >>>
> >>> > Hi Jincheng,
> >>> >
> >>> > Thanks a lot for bringing up this discussion and the proposal.
> >>> >
> >>> > Big +1 for improving the structure of PyFlink doc.
> >>> >
> >>> > It will be very friendly to give PyFlink users a unified entrance to
> >>> learn
> >>> > PyFlink documents.
> >>> >
> >>> > Best,
> >>> > Xingbo
> >>> >
> >>> > Dian Fu mailto:dian0511...@gmail.com>> 
> >>> > 于2020年7月31日周五 上午11:00写道:
> >>> >
> >>> >> Hi Jincheng,
> >>> >>
> >>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >>> >> improve the Python API doc.
> >>> >>
> >>> >> I have received many feedbacks from PyFlink beginners about
> >>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
> >>> mixed
> >>> >> with the Java doc and it's not easy to find the docs he wants to know.
> >>> >>
> >>> >> I think it would greatly improve the user experience if we can have
> >>> one
> >>> >> place which includes most knowledges PyFlink users should know.
> >>> >>
> >>> >> Regards,
> >>> >> Dian
> >>> >>
> >>> >> 在 2020年7月31日,上午10:14,jincheng sun  >>> >> <mailto:sunjincheng...@gmail.com>> 写道:
> >>> >>
> >>> >> Hi folks,
> >>> >>
> >>> >> Since the release of Flink 1.11, users of PyFlink have continued to
> >>> grow.
> >>> >> As far as I know there are many companies have used PyFlink for data
> >>> >> analysis, operation and maintenance monitoring business has been put
> >>> into
> >>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According
> >>> to
> >>> >> the feedback we received, current documentation is not very friendly
> >>> to
> >>> >> PyFlink users. There are two shortcomings:
> >>> >>
> >>> >> - Python related content is mixed in the Java/Scala documentation,
> >>> which
> >>> >> makes it difficult for users who only focus on PyFlink to read.
> >>> >> - There is already a "Python Table API" section in the Table API
> >>> document
> >>> >> to store PyFlink documents, but the number of articles is small and
> >>> the
> >>> >> content is fragmented. It is difficult for beginners to learn from it.
> >>> >>
> >>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >>> >> documents will be added for those new APIs. In order to increase the
> >>> >> readability and maintainability of the PyFlink document, Wei Zhong
> >>> and me
> >>> >> have discussed offline and would like to rework it via this FLIP.
> >>> >>
> >>> >> We will rework the document around the following three objectives:
> >>> >>
> >>> >> - Add a separate section for Python API under the "Application
> >>> >> Development" section.
> >>> >> - Restructure current Python documentation to a brand new structure to
> >>> >> ensure complete content and friendly to beginners.
> >>> >> - Improve the documents shared by Python/Java/Scala to make it more
> >>> >> friendly to Python users and without affecting Java/Scala users.
> >>> >>
> >>> >> More detail can be found in the FLIP-133:
> >>> >>
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
> >>>  
> >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation>
> >>> >>
> >>> >> Best,
> >>> >> Jincheng
> >>> >>
> >>> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg 
> >>> >> <https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg>
> >>> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g 
> >>> >> <https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g>
> >>> >>
> >>> >>
> >>> >>
> >>>
> >>



Re: PyFlink DDL UDTF join error

2020-07-29 Thread Wei Zhong
Hi Manas,

It seems a bug of the create view operation. I have created a JIRA for it: 
https://issues.apache.org/jira/browse/FLINK-18750 
<https://issues.apache.org/jira/browse/FLINK-18750>

Before repairing, please do not use create view operation for udtf call. 

Best,
Wei

> 在 2020年7月28日,21:19,Wei Zhong  写道:
> 
> Hi Manas,
> 
> It seems like a bug. You can try to replace the udtf sql call with such code 
> as a workaround currently:
> 
> t_env.register_table("tmp_view", 
> t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, 
> featureValue)"))
> 
> This works for me. I’ll try to find out what caused this exception.
> 
> Best,
> Wei
> 
>> 在 2020年7月28日,18:33,Manas Kale > <mailto:manaskal...@gmail.com>> 写道:
>> 
>> Hi,
>> Using pyFlink DDL, I am trying to:
>> Consume a Kafka JSON stream. This has messages with aggregate data, example: 
>>  "data": 
>> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
>> I am splitting field "data" so that I can process its values individually. 
>> For that, I have defined a UDTF.
>> I store the UDTF output in a temporary view. (Meaning each output of the 
>> UDTF will contain "0001" 105.0, "0002" 1.21 etc...)
>> I use the values in this temporary view to calculate some aggregation 
>> metrics.
>> I am getting an SQL error for  step 4.
>> Code:
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, 
>> DataTypes, Row
>> from pyflink.table.udf import udtf
>> from json import loads
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env, 
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
>> 
>> 
>> @udtf(input_types=DataTypes.STRING(), result_types= 
>> [DataTypes.STRING(),DataTypes.DOUBLE()])
>> def split_feature_values(data_string):
>> json_data = loads(data_string)
>> for f_name, f_value in json_data.items():
>> yield f_name, f_value
>> 
>> # configure the off-heap memory of current taskmanager to enable the python 
>> worker uses off-heap memory.
>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>  '80m')
>> 
>> # Register UDTF
>> t_env.register_function("split", split_feature_values)
>> # ... string constants
>> 
>> # Init Kafka input table
>> t_env.execute_sql(f"""   
>> CREATE TABLE {INPUT_TABLE} (
>> monitorId STRING,
>> deviceId STRING,
>> state INT,
>> data STRING,
>> time_str TIMESTAMP(3),
>> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' 
>> SECOND
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{INPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>> 'format' = 'json'
>> )
>> """)
>> 
>> # 10 sec summary table
>> t_env.execute_sql(f"""
>> CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
>> monitorId STRING,
>> featureName STRING,
>> maxFv DOUBLE,
>> minFv DOUBLE,
>> avgFv DOUBLE,
>> windowStart TIMESTAMP(3),
>> WATERMARK FOR windowStart AS windowStart
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>> 'format' = 'json'
>> )
>> """)
>> 
>> # Join with UDTF
>> t_env.execute_sql(f"""
>> CREATE VIEW tmp_view AS
>> SELECT * FROM ( 
>> SELECT monitorId, T.featureName, T.featureValue, time_str
>> FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, 
>> featureValue)
>> )
>> """)
>> 
>> # Create 10 second view <- this causes the error
>> t_env.execute_sql(f"""
>> INSERT INTO {TEN_SEC_OUTPUT_TABLE}
>> SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), 
>> AVG(featureValue), TUMBLE_START(time_str, INTER

Re: PyFlink DDL UDTF join error

2020-07-28 Thread Wei Zhong
Hi Manas,

It seems like a bug. You can try to replace the udtf sql call with such code as 
a workaround currently:

t_env.register_table("tmp_view", 
t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, 
featureValue)"))

This works for me. I’ll try to find out what caused this exception.

Best,
Wei

> 在 2020年7月28日,18:33,Manas Kale  写道:
> 
> Hi,
> Using pyFlink DDL, I am trying to:
> Consume a Kafka JSON stream. This has messages with aggregate data, example:  
> "data": 
> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
> I am splitting field "data" so that I can process its values individually. 
> For that, I have defined a UDTF.
> I store the UDTF output in a temporary view. (Meaning each output of the UDTF 
> will contain "0001" 105.0, "0002" 1.21 etc...)
> I use the values in this temporary view to calculate some aggregation metrics.
> I am getting an SQL error for  step 4.
> Code:
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, 
> DataTypes, Row
> from pyflink.table.udf import udtf
> from json import loads
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env, 
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
> 
> 
> @udtf(input_types=DataTypes.STRING(), result_types= 
> [DataTypes.STRING(),DataTypes.DOUBLE()])
> def split_feature_values(data_string):
> json_data = loads(data_string)
> for f_name, f_value in json_data.items():
> yield f_name, f_value
> 
> # configure the off-heap memory of current taskmanager to enable the python 
> worker uses off-heap memory.
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>  '80m')
> 
> # Register UDTF
> t_env.register_function("split", split_feature_values)
> # ... string constants
> 
> # Init Kafka input table
> t_env.execute_sql(f"""   
> CREATE TABLE {INPUT_TABLE} (
> monitorId STRING,
> deviceId STRING,
> state INT,
> data STRING,
> time_str TIMESTAMP(3),
> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' 
> SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{INPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """)
> 
> # 10 sec summary table
> t_env.execute_sql(f"""
> CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
> monitorId STRING,
> featureName STRING,
> maxFv DOUBLE,
> minFv DOUBLE,
> avgFv DOUBLE,
> windowStart TIMESTAMP(3),
> WATERMARK FOR windowStart AS windowStart
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """)
> 
> # Join with UDTF
> t_env.execute_sql(f"""
> CREATE VIEW tmp_view AS
> SELECT * FROM ( 
> SELECT monitorId, T.featureName, T.featureValue, time_str
> FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, 
> featureValue)
> )
> """)
> 
> # Create 10 second view <- this causes the error
> t_env.execute_sql(f"""
> INSERT INTO {TEN_SEC_OUTPUT_TABLE}
> SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), 
> AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND)
> FROM tmp_view
> GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName
> """)
> 
> The last SQL statement where I calculate metrics causes the error. The error 
> message is :
> Traceback (most recent call last):
>   File 
> "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", line 
> 97, in 
> """)
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py",
>  line 543, in execute_sql
> return TableResult(self._j_tenv.executeSql(stmt))
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>  line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name 
> )
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>  line 147, in deco
> return f(*a, **kw)
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>  line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
> : org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 4, column 15 to line 4, column 20: Column 'data' not found in any table
> 
> I don't understand why 

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 Thread Wei Zhong
Congratulations! Thanks Dian for the great work!

Best,
Wei

> 在 2020年7月22日,15:09,Leonard Xu  写道:
> 
> Congratulations!
> 
> Thanks Dian Fu for the great work as release manager, and thanks everyone 
> involved!
> 
> Best
> Leonard Xu
> 
>> 在 2020年7月22日,14:52,Dian Fu  写道:
>> 
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.11.1, which is the first bugfix release for the Apache Flink 1.11 
>> series.
>> 
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>> 
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
>> 
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348323
>> 
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>> 
>> Regards,
>> Dian
> 



Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 Thread Wei Zhong
Congratulations! Thanks Dian for the great work!

Best,
Wei

> 在 2020年7月22日,15:09,Leonard Xu  写道:
> 
> Congratulations!
> 
> Thanks Dian Fu for the great work as release manager, and thanks everyone 
> involved!
> 
> Best
> Leonard Xu
> 
>> 在 2020年7月22日,14:52,Dian Fu  写道:
>> 
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.11.1, which is the first bugfix release for the Apache Flink 1.11 
>> series.
>> 
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>> 
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
>> 
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348323
>> 
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>> 
>> Regards,
>> Dian
> 



Re: windows用户使用pyflink问题

2020-04-27 Thread Wei Zhong
Hi Tao,

PyFlink 的windows支持正在开发中,预计在1.11发布。届时可以解决在windows下开发PyFlink的问题。

> 在 2020年4月28日,10:23,tao siyuan  写道:
> 
> 好的,我试试
> 
> Zhefu PENG  于2020年4月28日周二 上午10:16写道:
> 
>> 可以尝试在external lib把site-packages下的内容都添加进去,可以帮助提升开发效率。
>> 
>> On Tue, Apr 28, 2020 at 10:13 tao siyuan  wrote:
>> 
>>> 目前,pycharm不支持pyflink开发,请问在windows
>>> 系统下,还有其他有效工具或者方式能更加方便得支持pyflink的开发,debugging,源码track吗?
>>> 
>> 



Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-10 Thread Wei Zhong
Hi,

Thanks for driving this, Jincheng.

+1 (non-binding) 

- Verified signatures and checksums.
- Verified README.md and setup.py.
- Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python 3.7.5 
successfully.
- Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via 
`pyflink-shell.sh local` and try the examples in the help message, run well and 
no exception.
- Try a word count example in IDE with Python 2.7.15 and Python 3.7.5, run well 
and no exception.

Best,
Wei


> 在 2020年2月10日,19:12,jincheng sun  写道:
> 
> Hi everyone,
> 
> Please review and vote on the release candidate #1 for the PyFlink version 
> 1.9.2, as follows:
> 
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> 
> The complete staging area is available for your review, which includes:
> 
> * the official Apache binary convenience releases to be deployed to 
> dist.apache.org  [1], which are signed with the key 
> with fingerprint 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from 
> source code [3].
> 
> The vote will be open for at least 72 hours. It is adopted by majority 
> approval, with at least 3 PMC affirmative votes.
> 
> Thanks,
> Jincheng
> 
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/ 
> 
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS 
> 
> [3] https://github.com/apache/flink/tree/release-1.9.2 
> 


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-10 Thread Wei Zhong
Hi,

Thanks for driving this, Jincheng.

+1 (non-binding) 

- Verified signatures and checksums.
- Verified README.md and setup.py.
- Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python 3.7.5 
successfully.
- Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via 
`pyflink-shell.sh local` and try the examples in the help message, run well and 
no exception.
- Try a word count example in IDE with Python 2.7.15 and Python 3.7.5, run well 
and no exception.

Best,
Wei


> 在 2020年2月10日,19:12,jincheng sun  写道:
> 
> Hi everyone,
> 
> Please review and vote on the release candidate #1 for the PyFlink version 
> 1.9.2, as follows:
> 
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> 
> The complete staging area is available for your review, which includes:
> 
> * the official Apache binary convenience releases to be deployed to 
> dist.apache.org  [1], which are signed with the key 
> with fingerprint 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from 
> source code [3].
> 
> The vote will be open for at least 72 hours. It is adopted by majority 
> approval, with at least 3 PMC affirmative votes.
> 
> Thanks,
> Jincheng
> 
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/ 
> 
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS 
> 
> [3] https://github.com/apache/flink/tree/release-1.9.2 
> 


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Wei Zhong
Hi Jincheng,

Thanks for bring up this discussion!

+1 for this proposal. Building from source takes long time and requires a
good network environment. Some users may not have such an environment.
Uploading to PyPI will greatly improve the user experience.

Best,
Wei

jincheng sun  于2020年2月4日周二 上午11:49写道:

> Hi folks,
>
> I am very happy to receive some user inquiries about the use of Flink
> Python API (PyFlink) recently. One of the more common questions is whether
> it is possible to install PyFlink without using source code build. The most
> convenient and natural way for users is to use `pip install apache-flink`.
> We originally planned to support the use of `pip install apache-flink` in
> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
> released at August 22, 2019[1], Flink's PyPI account system was not ready.
> At present, our PyPI account is available at October 09, 2019 [2](Only PMC
> can access), So for the convenience of users I propose:
>
> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
> - Update Flink 1.9 documentation to add support for `pip install`.
>
> As we all know, Flink 1.9.2 was just completed released at January 31, 2020
> [3]. There is still at least 1 to 2 months before the release of 1.9.3, so
> my proposal is completely considered from the perspective of user
> convenience. Although the proposed work is not large, we have not set a
> precedent for independent release of the Flink Python API(PyFlink) in the
> previous release process. I hereby initiate the current discussion and look
> forward to your feedback!
>
> Best,
> Jincheng
>
> [1]
>
> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
> [2]
>
> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
> [3]
>
> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Wei Zhong
Hi Jincheng,

Thanks for bring up this discussion!

+1 for this proposal. Building from source takes long time and requires a
good network environment. Some users may not have such an environment.
Uploading to PyPI will greatly improve the user experience.

Best,
Wei

jincheng sun  于2020年2月4日周二 上午11:49写道:

> Hi folks,
>
> I am very happy to receive some user inquiries about the use of Flink
> Python API (PyFlink) recently. One of the more common questions is whether
> it is possible to install PyFlink without using source code build. The most
> convenient and natural way for users is to use `pip install apache-flink`.
> We originally planned to support the use of `pip install apache-flink` in
> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
> released at August 22, 2019[1], Flink's PyPI account system was not ready.
> At present, our PyPI account is available at October 09, 2019 [2](Only PMC
> can access), So for the convenience of users I propose:
>
> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
> - Update Flink 1.9 documentation to add support for `pip install`.
>
> As we all know, Flink 1.9.2 was just completed released at January 31, 2020
> [3]. There is still at least 1 to 2 months before the release of 1.9.3, so
> my proposal is completely considered from the perspective of user
> convenience. Although the proposed work is not large, we have not set a
> precedent for independent release of the Flink Python API(PyFlink) in the
> previous release process. I hereby initiate the current discussion and look
> forward to your feedback!
>
> Best,
> Jincheng
>
> [1]
>
> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
> [2]
>
> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
> [3]
>
> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Wei Zhong
Congrats Dian Fu! Well deserved!

Best,
Wei

> 在 2020年1月16日,18:10,Hequn Cheng  写道:
> 
> Congratulations, Dian.
> Well deserved!
> 
> Best, Hequn 
> 
> On Thu, Jan 16, 2020 at 6:08 PM Leonard Xu  > wrote:
> Congratulations!  Dian Fu
> 
> Best,
> Leonard
> 
>> 在 2020年1月16日,18:00,Jeff Zhang mailto:zjf...@gmail.com>> 
>> 写道:
>> 
>> Congrats Dian Fu !
>> 
>> jincheng sun mailto:sunjincheng...@gmail.com>> 
>> 于2020年1月16日周四 下午5:58写道:
>> Hi everyone,
>> 
>> I'm very happy to announce that Dian accepted the offer of the Flink PMC to 
>> become a committer of the Flink project.
>> 
>> Dian Fu has been contributing to Flink for many years. Dian Fu played an 
>> essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has contributed 
>> several major features, reported and fixed many bugs, spent a lot of time 
>> reviewing pull requests and also frequently helping out on the user mailing 
>> lists and check/vote the release.
>>  
>> Please join in me congratulating Dian for becoming a Flink committer !
>> 
>> Best, 
>> Jincheng(on behalf of the Flink PMC)
>> 
>> 
>> -- 
>> Best Regards
>> 
>> Jeff Zhang
> 



Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Wei Zhong
Thanks Hequn for being the release manager. Great work!

Best,
Wei

> 在 2019年12月12日,15:27,Jingsong Li  写道:
> 
> Thanks Hequn for your driving, 1.8.3 fixed a lot of issues and it is very 
> useful to users.
> Great work!
> 
> Best,
> Jingsong Lee
> 
> On Thu, Dec 12, 2019 at 3:25 PM jincheng sun  > wrote:
> Thanks for being the release manager and the great work Hequn :)
> Also thanks to the community making this release possible!
> 
> Best,
> Jincheng
> 
> Jark Wu mailto:imj...@gmail.com>> 于2019年12月12日周四 下午3:23写道:
> Thanks Hequn for helping out this release and being the release manager.
> Great work!
> 
> Best,
> Jark
> 
> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  > wrote:
> 
> > Great work, Hequn
> >
> > Dian Fu mailto:dian0511...@gmail.com>> 
> > 于2019年12月12日周四 下午2:32写道:
> >
> >> Thanks Hequn for being the release manager and everyone who contributed
> >> to this release.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2019年12月12日,下午2:24,Hequn Cheng  >> > 写道:
> >>
> >> Hi,
> >>
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache Flink
> >> 1.8 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html 
> >> 
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html 
> >> 
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112 
> >> 
> >>
> >> We would like to thank all contributors of the Apache Flink community who
> >> made this release possible!
> >> Great thanks to @Jincheng as a mentor during this release.
> >>
> >> Regards,
> >> Hequn
> >>
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
> 
> 
> -- 
> Best, Jingsong Lee



Re: yarn-session模式通过python api消费kafka数据报错

2019-12-09 Thread Wei Zhong
Hi 改改,

看现在的报错,可能是kafka版本不匹配,你需要放入lib目录的kafka connector 
需要是0.11版本的,即flink-sql-connector-kafka-0.11_2.11-1.9.1.jar

> 在 2019年12月10日,10:06,改改  写道:
> 
> HI Wei Zhong ,
>  感谢您的回复,flink的lib目录下已经放了kafka connector的jar包的,我的flink/lib目录下文件目录如下:
>  
>  <5600791664319709.png>
> 
> 另外我的集群环境如下:
>  java :1.8.0_231
>  flink: 1.9.1
>  Python 3.6.9
>  Hadoop 3.1.1.3.1.4.0-315
> 
> 昨天试了下用python3.6 执行,依然是报错的,报错如下:
> 
> [root@hdp02 data_team_workspace]# /opt/flink-1.9.1/bin/flink run -py 
> tumble_window.py
> Starting execution of program
> Traceback (most recent call last):
>   File 
> "/tmp/pyflink/3fb6ccfd-482f-4426-859a-ebe003e14769/pyflink.zip/pyflink/util/exceptions.py",
>  line 147, in deco
>   File 
> "/tmp/pyflink/3fb6ccfd-482f-4426-859a-ebe003e14769/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o42.registerTableSource.
> : org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>  at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>  at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>  at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> Reason: No context matches.
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=hdp03:2181
> connector.properties.1.key=bootstrap.servers
> connector.properties.1.value=hdp02:6667
> connector.property-version=1
> connector.startup-mode=earliest-offset
> connector.topic=user_01
> connector.type=kafka
> connector.version=0.11
> format.fail-on-missing-field=true
> format.json-schema={  type: 'object',  properties: {col1: {  type: 
> 'string'},col2: {  type: 'string'},col3: {  type: 
> 'string'},time: {  type: 'string',  format: 'date-time'}  
> }}
> format.property-version=1
> format.type=json
> schema.0.name=rowtime
> schema.0.rowtime.timestamps.from=time
> schema.0.rowtime.timestamps.type=from-field
> schema.0.rowtime.watermarks.delay=6
> schema.0.rowtime.watermarks.type=periodic-bounded
> schema.0.type=TIMESTAMP
> schema.1.name=col1
> schema.1.type=VARCHAR
> schema.2.name=col2
> schema.2.type=VARCHAR
> schema.3.name=col3
> schema.3.type=VARCHAR
> update-mode=append
> The following factories have been considered:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.formats.csv.CsvRowFormatFactory
> org.apache.flink.addons.hbase.HBaseTableFactory
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>  at 
> org.apache.flink.table.factories.TableFactoryService.filterByContext(

Re: yarn-session模式通过python api消费kafka数据报错

2019-12-09 Thread Wei Zhong
Hi 改改,

只看这个报错的话信息量太少不能确定,不过一个可能性比较大的原因是kafka 
connector的jar包没有放到lib目录下,能否检查一下你的flink的lib目录下是否存在kafka connector的jar包?

> 在 2019年12月6日,14:36,改改  写道:
> 
> 
> [root@hdp02 bin]# ./flink run -yid application_1575352295616_0014 -py 
> /opt/tumble_window.py
> 2019-12-06 14:15:48,262 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2019-12-06 14:15:48,262 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2019-12-06 14:15:48,816 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> hdp02.wuagecluster/10.2.19.32:8050
> 2019-12-06 14:15:48,964 INFO  org.apache.hadoop.yarn.client.AHSProxy  
>   - Connecting to Application History server at 
> hdp03.wuagecluster/10.2.19.33:10200
> 2019-12-06 14:15:48,973 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-06 14:15:48,973 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-06 14:15:49,101 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Found 
> application JobManager host name 'hdp07.wuagecluster' and port '46376' from 
> supplied application id 'application_1575352295616_0014'
> Starting execution of program
> Traceback (most recent call last):
>  File "/usr/lib64/python2.7/runpy.py", line 162, in _run_module_as_main
>"__main__", fname, loader, pkg_name)
>  File "/usr/lib64/python2.7/runpy.py", line 72, in _run_code
>exec code in run_globals
>  File "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/tumble_window.py", 
> line 62, in 
>.register_table_source("source")
>  File 
> "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/pyflink.zip/pyflink/table/descriptors.py",
>  line 1293, in register_table_source
>  File 
> "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
>  File 
> "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/pyflink.zip/pyflink/util/exceptions.py",
>  line 154, in deco
> pyflink.util.exceptions.TableException: u'findAndCreateTableSource failed.'
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:83)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)