Re: pyflink 引用第三库的文件出现安装权限的问题

2020-12-14 文章 Xingbo Huang
Hi,

光看报错,不知道是哪一步的权限不足导致的,你要不尝试wheel包传上去试试看吧,tar包源码安装会执行setup.py,可能会读写yarn上某些没有权限的目录啥的。

Best,
Xingbo


pyflink 引用第三库的文件出现安装权限的问题

2020-12-14 文章 magichuang
请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错

flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器




提交命令:flink run -m yarn-cluster -ynm demo  -ys 2 -ytm 2048 -p 2 -py demo.py




代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png




报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png




requestments.txt:IPy==1.0cache_dir:  IPy-1.00.tar.gz




自定义udf代码:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def judge_ip(ip):

import IPy

if ip in IPy.IP('192.168.112.0/28'):

return 'in'

return 'out'







祝好~

怎么配置log4j2 properties 文件中的Kafka Appender

2020-12-14 文章 Fanshu
Hi,

在log4j 1.x 版本中,配置KafkaAppender属性只需要直接赋值,如
log4j.appender.kafka.compressionType=gzip,但是升级到log4j
2之后,Kafka相关属性都要分别设置name和value,如下:
appender.kafka.property.name=compression.type
appender.kafka.property.value=gzip

这就导致想设置其他kafka属性的时候(比如retries, lingerMs),会反复覆盖
appender.kafka.property.name/value。

请问这种情况有什么比较好的解决办法吗?谢谢!


Re: Re: pyflink是否可调试

2020-12-14 文章 guoliubi...@foxmail.com
1.13版本pyflink很多优化项啊,期待下个版本



guoliubi...@foxmail.com
 
发件人: Xingbo Huang
发送时间: 2020-12-15 10:37
收件人: user-zh
主题: Re: Re: pyflink是否可调试
Hi,
 
客户端写的python代码会在一个客户端的进程通过py4j调用flink
java的代码去编译你的作业(这里有一个客户端的Python进程,只是用来编译代码生成pipeline),
然后实际运行时,非python代码部分(就是非各种udf的逻辑是运行在JVM里面的),python部分(各种udf)是运行在另一个Python进程里面的。
实际上,在下一个1.13版本我们有考虑在你本地运行调试的时候,将实际运行python代码的重连回客户端那个编译Python代码的进程,这样可以更利于你的本地调试,就不用开启remote
debug了。
 
Best,
Xingbo
 
guoliubi...@foxmail.com  于2020年12月15日周二 上午10:29写道:
 
> Hi Xingbo,
>
> 多谢指导,亲测有效。
> 源python文件运行一会儿本身就结束运行了,过阵子后才会跳到断点里。
> 所以源python文件只是做了个提交的动作,实际执行都是异步执行,是否可以这么理解?
> 如果是的话,之前已经运行过很多次源python文件,是否本地已经在后台异步运行了多次?是的话是否能监控到这些任务?
>
>
>
> guoliubi...@foxmail.com
>
> 发件人: Xingbo Huang
> 发送时间: 2020-12-15 09:59
> 收件人: user-zh
> 主题: Re: pyflink是否可调试
> Hi,
> 想要调试可以使用的方式为
> 1. 在PyCharm里创建一个Python Remote Debug
> run -> Python Remote Debug -> + -> 选择一个端口(比如6789)
>
> 2. 安装pydevd-pycharm(你PyCharm使用的python解释器)
> pip install pydevd-pycharm
> 其实上一步那个界面也有指导你安装了
>
> 3.  将以下代码插入到你要断点的udaf的代码前面(这段代码其实也是来自第一步创建remote debug里面)
> import pydevd_pycharm
> pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
> stderrToServer=True)
>
> 例如
> @udaf(result_type=DataTypes.INT(), func_type="pandas")
> def mean_udaf(v):
> import pydevd_pycharm
> pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
> stderrToServer=True)
> v.mean()
>
> 4. 在你的PyCharm里面点击Debug启动你刚刚创建的Remote Debug Server
>
> 5. 直接点击Run运行你的作业,这个时候会断点在你的udaf的代码里面了
>
> Best,
> Xingbo
>
> guoliubi...@foxmail.com  于2020年12月15日周二 上午9:25写道:
>
> >
> >
> 基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。
> > 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。
> > 请问是否需要什么配置才能进行调试。
> >
> >
> >
> > guoliubi...@foxmail.com
> >
>


Re:回复: 如何让FlinkSQL访问到阿里云MaxCompute上的表?

2020-12-14 文章 陈帅



CREATETABLEuser_behavior(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3))WITH('connector'='kafka','properties.bootstrap.servers'='localhost:9092','topic'='user_behavior','format'='avro-confluent','avro-confluent.schema-registry.url'='http://localhost:8081','avro-confluent.schema-registry.subject'='user_behavior')

如上所示,我并不是想要知道如何在flink sql中连接confluent schema registry,而是想要通过catalog获取schema 
registry上对应topic的schema信息,以便我不需要手动输入那些fields字段,如同JdbcCatalog那样,直接使用已有表一样,我想直接使用topic,而不需要定义DDL











在 2020-12-15 00:34:03,"guoliubi...@foxmail.com"  写道:
>Confluent Schema Registry参考这个
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html
> 
>
>
>
>guoliubi...@foxmail.com
> 
>发件人: 陈帅
>发送时间: 2020-12-14 23:33
>收件人: user-zh@flink.apache.org
>主题: 如何让FlinkSQL访问到阿里云MaxCompute上的表?
>如何让FlinkSQL访问到阿里云MaxCompute上的表?
>又或者是Confluent Schema Registry上那些带schema的kafka topic? 
>需要自己定义Catalog吗?有相关的教程和资料么?谢谢!


pyflink udf 依赖问题

2020-12-14 文章 magichuang
大家好,我刚才修改了集群的flink配置文件  python.client.executable: /usr/bin/python3.6   
,将集群三台机器的Python默认  改为了python3.6 

flink版本:1.11flink  on  yarn集群搭建的,通过per-job模式提交任务的

提交命令:flink run -m yarn-cluster   -ytm 2048  -s  2 -p 2 -py traffic.py

截图地址

看报错是目录权限问题,是通过  先离线安装到cached_dir 然后提交的,flink集群搭建时用的是root用户,提交任务时也是root用户

截图地址




pyflink udf 依赖问题

2020-12-14 文章 magichuang
大家好,我刚才修改了集群的flink配置文件  python.client.executable: /usr/bin/python3.6   
,将集群三台机器的Python默认  改为了python3.6 

flink版本:1.11flink  on  yarn集群搭建的,通过per-job模式提交任务的

提交命令:flink run -m yarn-cluster   -ytm 2048  -s  2 -p 2 -py traffic.py

提交之后出现了新的问题,看报错是目录权限问题,是通过  先离线安装到cached_dir 然后提交的

Re: 关于 stream-stream Interval Join 的问题

2020-12-14 文章 赵一旦
补充,实际FROM_UNIXTIME应该返回 TIMESTAMP WITH LOCAL TIME ZONE
这个类型。(然后FlinkSQL可以自己转为TIMESTAMP)。

此外,关于分窗,除了offset这种显示的由用户来解决时区分窗以外。还可以通过支持 TIMESTAMP WITH LOCAL TIME ZONE
类型作为 event time 实现,当然内部当然还是通过offset实现,只是FlinkSQL语法层可以基于支持 TIMESTAMP WITH
LOCAL TIME ZONE 作为eventtime来实现这种效果。

如上是个人观点哈。。。

赵一旦  于2020年12月15日周二 上午11:29写道:

> 这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream
> API去对比的话,FlinkSQL的这种表现肯定是有问题的。
>
> 换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。
>
> 这本来是应该在window分窗处基于offset实现的功能。
>
> 赵一旦  于2020年12月15日周二 上午11:22写道:
>
>> 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。
>>
>> 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time
>> attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。
>>
>> 但是呢,目前不这么做好像也还不行。因为分窗必须基于time
>> attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。
>>
>>
>> 
>> 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00
>> UTC+8
>> 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。
>>
>> 
>> 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。
>> 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。
>>
>>
>>
>> macia kk  于2020年12月11日周五 下午3:04写道:
>>
>>> 你用的是哪个版本的Flink呢?
>>> -
>>> 1.11.2
>>>
>>> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>>> 所以你的binlog是怎么读进来的呢?自定义的format?
>>> -
>>> ts 就是时间戳
>>>
>>> bsTableEnv.executeSql("""
>>>   CREATE TABLE input_database (
>>> `table` STRING,
>>> `database` STRING,
>>> `data` ROW(
>>>   reference_id STRING,
>>>   transaction_sn STRING,
>>>   transaction_type BIGINT,
>>>   merchant_id BIGINT,
>>>   transaction_id BIGINT,
>>>   status BIGINT
>>>  ),
>>> ts BIGINT,
>>> event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>>>  ) WITH (
>>>'connector.type' = 'kafka',
>>>'connector.version' = '0.11',
>>>'connector.topic' = 'mytopic',
>>>'connector.properties.bootstrap.servers' = '',
>>>'format.type' = 'json'
>>>  )
>>> )
>>>
>>>
>>>
>>> ```
>>>
>>>
>>>
>>> Benchao Li  于2020年12月10日周四 下午6:14写道:
>>>
>>> > 你用的是哪个版本的Flink呢?
>>> >
>>> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>>> > 所以你的binlog是怎么读进来的呢?自定义的format?
>>> >
>>> > macia kk  于2020年12月10日周四 上午1:06写道:
>>> >
>>> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS
>>> event_time
>>> > -
>>> > > INTERVAL 'x' HOUR
>>> > >
>>> > >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
>>> > >
>>> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
>>> > > 能够反推出来数据的 currentMaxTimestamp
>>> > >
>>> > > currentMaxTimestamp = watermark + maxOutOfOrderness
>>> > >
>>> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
>>> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
>>> > >
>>> > >
>>> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
>>> > >
>>> > >
>>> >
>>> {"table":"transaction_tab_0122","database":"main_db","transaction_type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":"
>>> > > *2020-12-10T01:02:24Z*"}
>>> > >
>>> > > UI 上显示的 watermark 是 1607555031000(Your time zone:
>>> 2020年12月10日星期四早上7点02分
>>> > > GMT+08:00)
>>> > >
>>> > > 这个 watermark 是未来的时间 
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > macia kk  于2020年12月9日周三 下午11:36写道:
>>> > >
>>> > > > 感谢 一旦 和 Benchao
>>> > > >
>>> > > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join
>>> 上的数据,但是我
>>> > > Job
>>> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
>>> > > >
>>> > > > val result = bsTableEnv.sqlQuery("""
>>> > > >SELECT *
>>> > > >FROM (
>>> > > >   SELECT t1.`table`, t1.`database`, t1.transaction_type,
>>> > > t1.transaction_id,
>>> > > > t1.reference_id, t1.transaction_sn, t1.merchant_id,
>>> > > t1.status, t1.event_time
>>> > > >   FROM main_db as t1
>>> > > >   LEFT JOIN main_db as t2
>>> > > >   ON t1.reference_id = t2.reference_id
>>> > > >   WHERE t1.event_time >= t2.event_time + INTERVAL '5'
>>> MINUTES
>>> > > >AND t1.event_time <= t2.event_time - INTERVAL '5'
>>> MINUTES
>>> > > >)
>>> > > >   """.stripMargin)
>>> > > >
>>> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
>>> > > >
>>> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
>>> > > > subtask的watermark。
>>> > > > ---
>>> > > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的
>>> watermark
>>> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
>>> > > >
>>> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
>>> > event
>>> > > > 

Re: 关于 stream-stream Interval Join 的问题

2020-12-14 文章 赵一旦
这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream
API去对比的话,FlinkSQL的这种表现肯定是有问题的。
换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。

这本来是应该在window分窗处基于offset实现的功能。

赵一旦  于2020年12月15日周二 上午11:22写道:

> 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。
>
> 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time
> attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。
>
> 但是呢,目前不这么做好像也还不行。因为分窗必须基于time
> attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。
>
>
> 
> 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00
> UTC+8
> 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。
>
> 
> 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。
> 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。
>
>
>
> macia kk  于2020年12月11日周五 下午3:04写道:
>
>> 你用的是哪个版本的Flink呢?
>> -
>> 1.11.2
>>
>> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>> 所以你的binlog是怎么读进来的呢?自定义的format?
>> -
>> ts 就是时间戳
>>
>> bsTableEnv.executeSql("""
>>   CREATE TABLE input_database (
>> `table` STRING,
>> `database` STRING,
>> `data` ROW(
>>   reference_id STRING,
>>   transaction_sn STRING,
>>   transaction_type BIGINT,
>>   merchant_id BIGINT,
>>   transaction_id BIGINT,
>>   status BIGINT
>>  ),
>> ts BIGINT,
>> event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
>> WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>>  ) WITH (
>>'connector.type' = 'kafka',
>>'connector.version' = '0.11',
>>'connector.topic' = 'mytopic',
>>'connector.properties.bootstrap.servers' = '',
>>'format.type' = 'json'
>>  )
>> )
>>
>>
>>
>> ```
>>
>>
>>
>> Benchao Li  于2020年12月10日周四 下午6:14写道:
>>
>> > 你用的是哪个版本的Flink呢?
>> >
>> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>> > 所以你的binlog是怎么读进来的呢?自定义的format?
>> >
>> > macia kk  于2020年12月10日周四 上午1:06写道:
>> >
>> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS
>> event_time
>> > -
>> > > INTERVAL 'x' HOUR
>> > >
>> > >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
>> > >
>> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
>> > > 能够反推出来数据的 currentMaxTimestamp
>> > >
>> > > currentMaxTimestamp = watermark + maxOutOfOrderness
>> > >
>> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
>> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
>> > >
>> > >
>> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
>> > >
>> > >
>> >
>> {"table":"transaction_tab_0122","database":"main_db","transaction_type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":"
>> > > *2020-12-10T01:02:24Z*"}
>> > >
>> > > UI 上显示的 watermark 是 1607555031000(Your time zone:
>> 2020年12月10日星期四早上7点02分
>> > > GMT+08:00)
>> > >
>> > > 这个 watermark 是未来的时间 
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > macia kk  于2020年12月9日周三 下午11:36写道:
>> > >
>> > > > 感谢 一旦 和 Benchao
>> > > >
>> > > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join
>> 上的数据,但是我
>> > > Job
>> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
>> > > >
>> > > > val result = bsTableEnv.sqlQuery("""
>> > > >SELECT *
>> > > >FROM (
>> > > >   SELECT t1.`table`, t1.`database`, t1.transaction_type,
>> > > t1.transaction_id,
>> > > > t1.reference_id, t1.transaction_sn, t1.merchant_id,
>> > > t1.status, t1.event_time
>> > > >   FROM main_db as t1
>> > > >   LEFT JOIN main_db as t2
>> > > >   ON t1.reference_id = t2.reference_id
>> > > >   WHERE t1.event_time >= t2.event_time + INTERVAL '5'
>> MINUTES
>> > > >AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
>> > > >)
>> > > >   """.stripMargin)
>> > > >
>> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
>> > > >
>> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
>> > > > subtask的watermark。
>> > > > ---
>> > > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
>> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
>> > > >
>> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
>> > event
>> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
>> > > >
>> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
>> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
>> > > >
>> > > >
>> > > > Thanks and best regards
>> > > >
>> > > >
>> > > > Benchao Li  于2020年12月9日周三 上午10:24写道:
>> > > >
>> > > >> Hi macia,
>> > > >>
>> > > >> 一旦回答的基本比较完整了。
>> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
>> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
>> > > >>
>> > > >> 

Re: 关于 stream-stream Interval Join 的问题

2020-12-14 文章 赵一旦
这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。

之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time
attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。

但是呢,目前不这么做好像也还不行。因为分窗必须基于time
attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。



比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00
UTC+8
这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。


按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。
要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。



macia kk  于2020年12月11日周五 下午3:04写道:

> 你用的是哪个版本的Flink呢?
> -
> 1.11.2
>
> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
> 所以你的binlog是怎么读进来的呢?自定义的format?
> -
> ts 就是时间戳
>
> bsTableEnv.executeSql("""
>   CREATE TABLE input_database (
> `table` STRING,
> `database` STRING,
> `data` ROW(
>   reference_id STRING,
>   transaction_sn STRING,
>   transaction_type BIGINT,
>   merchant_id BIGINT,
>   transaction_id BIGINT,
>   status BIGINT
>  ),
> ts BIGINT,
> event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>  ) WITH (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'mytopic',
>'connector.properties.bootstrap.servers' = '',
>'format.type' = 'json'
>  )
> )
>
>
>
> ```
>
>
>
> Benchao Li  于2020年12月10日周四 下午6:14写道:
>
> > 你用的是哪个版本的Flink呢?
> >
> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
> > 所以你的binlog是怎么读进来的呢?自定义的format?
> >
> > macia kk  于2020年12月10日周四 上午1:06写道:
> >
> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS
> event_time
> > -
> > > INTERVAL 'x' HOUR
> > >
> > >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
> > >
> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
> > > 能够反推出来数据的 currentMaxTimestamp
> > >
> > > currentMaxTimestamp = watermark + maxOutOfOrderness
> > >
> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
> > >
> > >
> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
> > >
> > >
> >
> {"table":"transaction_tab_0122","database":"main_db","transaction_type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":"
> > > *2020-12-10T01:02:24Z*"}
> > >
> > > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
> > > GMT+08:00)
> > >
> > > 这个 watermark 是未来的时间 
> > >
> > >
> > >
> > >
> > >
> > > macia kk  于2020年12月9日周三 下午11:36写道:
> > >
> > > > 感谢 一旦 和 Benchao
> > > >
> > > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join
> 上的数据,但是我
> > > Job
> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
> > > >
> > > > val result = bsTableEnv.sqlQuery("""
> > > >SELECT *
> > > >FROM (
> > > >   SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > > t1.transaction_id,
> > > > t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > > t1.status, t1.event_time
> > > >   FROM main_db as t1
> > > >   LEFT JOIN main_db as t2
> > > >   ON t1.reference_id = t2.reference_id
> > > >   WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
> > > >AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
> > > >)
> > > >   """.stripMargin)
> > > >
> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
> > > >
> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> > > > subtask的watermark。
> > > > ---
> > > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
> > > >
> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
> > event
> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
> > > >
> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
> > > >
> > > >
> > > > Thanks and best regards
> > > >
> > > >
> > > > Benchao Li  于2020年12月9日周三 上午10:24写道:
> > > >
> > > >> Hi macia,
> > > >>
> > > >> 一旦回答的基本比较完整了。
> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
> > > >>
> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
> > > subtask见到的最大的watermark
> > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
> > > 10个小时,这个已经会导致
> > > >> 你的没有join到的数据下发会延迟很多了。
> > > >>
> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
> > > >>
> > > >> 赵一旦  于2020年12月9日周三 上午10:15写道:
> > > >>
> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> > > >> >
> > > >> >
> > > >> >
> > > 

Re: Re: pyflink是否可调试

2020-12-14 文章 Xingbo Huang
Hi,

客户端写的python代码会在一个客户端的进程通过py4j调用flink
java的代码去编译你的作业(这里有一个客户端的Python进程,只是用来编译代码生成pipeline),
然后实际运行时,非python代码部分(就是非各种udf的逻辑是运行在JVM里面的),python部分(各种udf)是运行在另一个Python进程里面的。
实际上,在下一个1.13版本我们有考虑在你本地运行调试的时候,将实际运行python代码的重连回客户端那个编译Python代码的进程,这样可以更利于你的本地调试,就不用开启remote
debug了。

Best,
Xingbo

guoliubi...@foxmail.com  于2020年12月15日周二 上午10:29写道:

> Hi Xingbo,
>
> 多谢指导,亲测有效。
> 源python文件运行一会儿本身就结束运行了,过阵子后才会跳到断点里。
> 所以源python文件只是做了个提交的动作,实际执行都是异步执行,是否可以这么理解?
> 如果是的话,之前已经运行过很多次源python文件,是否本地已经在后台异步运行了多次?是的话是否能监控到这些任务?
>
>
>
> guoliubi...@foxmail.com
>
> 发件人: Xingbo Huang
> 发送时间: 2020-12-15 09:59
> 收件人: user-zh
> 主题: Re: pyflink是否可调试
> Hi,
> 想要调试可以使用的方式为
> 1. 在PyCharm里创建一个Python Remote Debug
> run -> Python Remote Debug -> + -> 选择一个端口(比如6789)
>
> 2. 安装pydevd-pycharm(你PyCharm使用的python解释器)
> pip install pydevd-pycharm
> 其实上一步那个界面也有指导你安装了
>
> 3.  将以下代码插入到你要断点的udaf的代码前面(这段代码其实也是来自第一步创建remote debug里面)
> import pydevd_pycharm
> pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
> stderrToServer=True)
>
> 例如
> @udaf(result_type=DataTypes.INT(), func_type="pandas")
> def mean_udaf(v):
> import pydevd_pycharm
> pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
> stderrToServer=True)
> v.mean()
>
> 4. 在你的PyCharm里面点击Debug启动你刚刚创建的Remote Debug Server
>
> 5. 直接点击Run运行你的作业,这个时候会断点在你的udaf的代码里面了
>
> Best,
> Xingbo
>
> guoliubi...@foxmail.com  于2020年12月15日周二 上午9:25写道:
>
> >
> >
> 基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。
> > 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。
> > 请问是否需要什么配置才能进行调试。
> >
> >
> >
> > guoliubi...@foxmail.com
> >
>


两条流去重后再关联出现不符合预期数据

2020-12-14 文章 hdxg1101300...@163.com
你好:
我在使用flink 
1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1
(SELECT [column_list] FROM (
   SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
   ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name) 
WHERE rownum = 1)
去重后再左关联; 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据;

left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279)
left> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
left> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279)
right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000)
left> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
left> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000)

第1行左流来了数据显示true,此时右流没有数据结果是null;
第2行右流来了数据,显示为true(单独打印了右流的结果);
第3行显示左流撤回;
第4行 左右流数据关联上,正常显示;
第5行 左流数据变更,数据撤回;
第6行 显示变更后的数据;
第7行 右流数据变化,数据撤回;
第8行 显示右流最新的结果;
第9行 因为右流数据变化 所以左流(关联数据)撤回;
第10行 和第11 行 不符合预期;
正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对;
所以想请教一下大家;

1607998361520> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
1607998361520> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)

我的sql语句如下
String sql = "SELECT a.sheetId 
sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," +
" 
sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime
 " +
" from (SELECT 
sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," +
" sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" +
" FROM (SELECT *," +
" ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
desc) AS rownum " +
"   FROM sheetMain)" +
" WHERE rownum = 1 ) a" +
" left JOIN " +
" (select sheetId,provided,satisfied,score,operateTime from (SELECT *," 
+
" ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
desc) AS rownum " +
"   FROM sheetAnswers)" +
" WHERE rownum = 1 ) c" +
" ON a.sheetId = c.sheetId " ;



hdxg1101300...@163.com


Re: Re: pyflink是否可调试

2020-12-14 文章 guoliubi...@foxmail.com
Hi Xingbo,

多谢指导,亲测有效。
源python文件运行一会儿本身就结束运行了,过阵子后才会跳到断点里。
所以源python文件只是做了个提交的动作,实际执行都是异步执行,是否可以这么理解?
如果是的话,之前已经运行过很多次源python文件,是否本地已经在后台异步运行了多次?是的话是否能监控到这些任务?



guoliubi...@foxmail.com
 
发件人: Xingbo Huang
发送时间: 2020-12-15 09:59
收件人: user-zh
主题: Re: pyflink是否可调试
Hi,
想要调试可以使用的方式为
1. 在PyCharm里创建一个Python Remote Debug
run -> Python Remote Debug -> + -> 选择一个端口(比如6789)
 
2. 安装pydevd-pycharm(你PyCharm使用的python解释器)
pip install pydevd-pycharm
其实上一步那个界面也有指导你安装了
 
3.  将以下代码插入到你要断点的udaf的代码前面(这段代码其实也是来自第一步创建remote debug里面)
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
stderrToServer=True)
 
例如
@udaf(result_type=DataTypes.INT(), func_type="pandas")
def mean_udaf(v):
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
stderrToServer=True)
v.mean()
 
4. 在你的PyCharm里面点击Debug启动你刚刚创建的Remote Debug Server
 
5. 直接点击Run运行你的作业,这个时候会断点在你的udaf的代码里面了
 
Best,
Xingbo
 
guoliubi...@foxmail.com  于2020年12月15日周二 上午9:25写道:
 
>
> 基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。
> 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。
> 请问是否需要什么配置才能进行调试。
>
>
>
> guoliubi...@foxmail.com
>


Re:回复: Re: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-14 文章 kandy.wang






@ guoliubin85
感谢回复,和你说的差不多,问题已经搞定。

在 2020-12-14 13:02:54,"guoliubi...@foxmail.com"  写道:
>不好意思我没说清楚。
>我这边用的是这样的SQL可以运作,你可以参考下。
>CREATE TABLE `someTable` (
>  eventTime TIMESTAMP(3),
>  WATERMARK FOR eventTime AS eventTime
>)
>eventTime是java的Long类型,包含毫秒,SQL里可以直接转成TIMESTAMP
>select someFunc(field)
>from `someTable`
>group by TUMBLE(eventTime, INTERVAL '1' SECOND)
>
>
>
>guoliubi...@foxmail.com
> 
>发件人: kandy.wang
>发送时间: 2020-12-14 11:23
>收件人: user-zh
>主题: Re:回复: Window aggregate can only be defined over a time attribute column, 
>but TIMESTAMP(3) encountered.
>hi guoliubin85:
>一样的报错:
> 
>Flink SQL> select mid,code,floor_id,TUMBLE_START(time_local/1000, INTERVAL '1' 
>MINUTE) as log_minute,count(1) pv
> 
>> from lightart_expose
> 
>> where code is not null and floor_id is not null
> 
>> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
>> MINUTE);[ERROR] Could not execute SQL statement. Reason:
> 
>org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
>to arguments of type '$TUMBLE(, )'. Supported 
>form(s): '$TUMBLE(, )'
> 
>'$TUMBLE(, , )'
> 
> 
> 
>> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
>> MINUTE);[ERROR] Could not execute SQL statement. Reason:
>org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
>to arguments of type '$TUMBLE(, )'. Supported 
>form(s): '$TUMBLE(, )'
>'$TUMBLE(, , )'
> 
>在 2020-12-14 10:41:12,"guoliubi...@foxmail.com"  写道:
>>TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP
>>
>>
>>
>>guoliubi...@foxmail.com
>> 
>>发件人: kandy.wang
>>发送时间: 2020-12-14 10:28
>>收件人: user-zh
>>主题: Window aggregate can only be defined over a time attribute column, but 
>>TIMESTAMP(3) encountered.
>>[ERROR] Could not execute SQL statement. 
>>Reason:org.apache.flink.table.api.TableException: Window aggregate can only 
>>be defined over a time attribute column, but TIMESTAMP(3) encountered.
>> 
>> 
>>SQL 如下:
>>create temporary view expose as
>> 
>>select  
>> 
>>mid
>> 
>>,time_local
>> 
>>,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as 
>>log_ts
>> 
>>,proctime
>> 
>>from hive.temp.kafka_table
>> 
>>;
>>time_local 是bigint
>> 
>> 
>> 
>>select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv
>> 
>>from expose
>> 
>>group by TUMBLE(log_ts, INTERVAL '1' MINUTE);
>> 
>> 
>>window agg的字段报错,如何解决。


Re: pyflink是否可调试

2020-12-14 文章 Xingbo Huang
Hi,
想要调试可以使用的方式为
1. 在PyCharm里创建一个Python Remote Debug
run -> Python Remote Debug -> + -> 选择一个端口(比如6789)

2. 安装pydevd-pycharm(你PyCharm使用的python解释器)
pip install pydevd-pycharm
其实上一步那个界面也有指导你安装了

3.  将以下代码插入到你要断点的udaf的代码前面(这段代码其实也是来自第一步创建remote debug里面)
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
stderrToServer=True)

例如
@udaf(result_type=DataTypes.INT(), func_type="pandas")
def mean_udaf(v):
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
stderrToServer=True)
v.mean()

4. 在你的PyCharm里面点击Debug启动你刚刚创建的Remote Debug Server

5. 直接点击Run运行你的作业,这个时候会断点在你的udaf的代码里面了

Best,
Xingbo

guoliubi...@foxmail.com  于2020年12月15日周二 上午9:25写道:

>
> 基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。
> 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。
> 请问是否需要什么配置才能进行调试。
>
>
>
> guoliubi...@foxmail.com
>


java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2020-12-14 文章 水静于止
java.io.IOException: unable to open JDBC writer
at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:61)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:112)
at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
... 13 more
Flinksql 上传jar 到服务器上一直启动失败,找不到jdbc驱动,而使用 自定义sink 则正常使用

发送自 Windows 10 版邮件应用



java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2020-12-14 文章 ????????
java.io.IOException: unable to open JDBC writer
 at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:61)
 at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:112)
 at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
 at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
 at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:264)
 at 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
 at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
 at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
 ... 13 more


jar ?? ?? mysql jdbc  flinkSQL 
job?? ??sink  ??

Application Mode job on K8S集群,无法缩容问题

2020-12-14 文章 lichunguang
Flink1.11.1版本job以Application
Mode在K8S集群上运行,TaskManager都是申请的单独pod,分配在不同的node上,导致资源使用较低时,K8S集群无法进行自动缩容操作,浪费资源。

1、TaskManager申请的单独pod逻辑
org.apache.flink.kubernetes.KubernetesResourceManager.requestKubernetesPod()

2、K8S集群自动缩容条件,如AWS集群
 

3、资源使用低的节点无法调度
 


大家有关注这个问题吗?可以讨论下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink1.11.1版本Application Mode job on K8S集群,too old resource version问题

2020-12-14 文章 lichunguang
Flink1.11.1版本job以Application Mode在K8S集群上运行,jobmanager每个小时会重启一次,报错【Fatal error
occurred in
ResourceManager.io.fabric8.kubernetes.client.KubernetesClientException: too
old resource version】

pod重启:
 

重启原因:
2020-12-10 07:21:19,290 ERROR
org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal
error occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 247468999 (248117930)
  at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2020-12-10 07:21:19,291 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
error occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 247468999 (248117930)
  at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]


网上查的原因是因为:
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient类中212行

@Override
public KubernetesWatch watchPodsAndDoCallback(Map labels,
PodCallbackHandler podCallbackHandler) {
return new KubernetesWatch(
this.internalClient.pods()
.withLabels(labels)
.watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
}

而ETCD中只会保留一段时间的version信息
【 I think it's standard behavior of Kubernetes to give 410 after some time
during watch. It's usually client's responsibility to handle it. In the
context of a watch, it will return HTTP_GONE when you ask to see changes for
a resourceVersion that is too old - i.e. when it can no longer tell you what
has changed since that version, since too many things have changed. In that
case, you'll need to start again, by not specifying a resourceVersion in
which case the watch will send you the current state of the thing you are
watching and then send updates from that point.】

大家有没遇到相同的问题,是怎么处理的?我有几个处理方式,希望能跟大家一起讨论一下。




--
Sent from: http://apache-flink.147419.n8.nabble.com/


pyflink是否可调试

2020-12-14 文章 guoliubi...@foxmail.com
基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。
可以确认程序有正确运行,因为sink到kafka里看了是有数据的。
请问是否需要什么配置才能进行调试。



guoliubi...@foxmail.com


Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-14 文章 Jacob
谢谢回复!

这个文档我也有查看

前几日在flink1.9-1.12各个客户端测试提交job时候发现
对于1.10+的版本,我手动导入export HADOOP_CLASSPATH=`hadoop
classpath`,没有效果,各种报错,基本都是Hadoop相关类、方法不存在(NoSuchMethod之类错误),把pom文件改来改去依然无用,后来只在pom文件中导入依赖:flink-shaded-hadoop-2-uber*-*,竟然可以正常提交并运行job了。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2020-12-14 文章 ????????
java.io.IOException: unable to open JDBC writer
at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:61)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:112)
at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at 
java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
at 
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
... 13 more


jar ?? ?? mysql jdbc  flinkSQL 
job?? ??sink  ??

java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2020-12-14 文章 ????????
jar ?? ?? mysql jdbc  flinkSQL 
job?? ??sink  ??

temporal table join SQL报错

2020-12-14 文章 guoliubi...@foxmail.com
我在使用flink 1.12.0,在按博客里的例子实现temporal table join
https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql
 

构造了类似的表格后,写了类似SQL
-- Event-time temporal table join
SELECT 
o.order_id,
o.order_time, 
o.amount * r.currency_rate AS amount,
r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency;
上传到flink后,提示
py4j.protocol.Py4JJavaError: An error occurred while calling o2.sqlQuery.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"ON" at line 8, column 5.
Was expecting one of:

"EXCEPT" ...
"FETCH" ...
"GROUP" ...

在把最后一行的ON改为WHERE后,这个SQL可以解析过去了,但是执行时报了另外的问题
py4j.protocol.Py4JJavaError: An error occurred while calling o153.executeInsert.
: org.apache.flink.table.api.ValidationException: Currently the join key in 
Temporal Table Join can not be empty.
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)

请问要如何正确的写该SQL,是否有关于temporal table join的更详细文档?


guoliubi...@foxmail.com


回复: 如何让FlinkSQL访问到阿里云MaxCompute上的表?

2020-12-14 文章 guoliubi...@foxmail.com
Confluent Schema Registry参考这个
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html
 



guoliubi...@foxmail.com
 
发件人: 陈帅
发送时间: 2020-12-14 23:33
收件人: user-zh@flink.apache.org
主题: 如何让FlinkSQL访问到阿里云MaxCompute上的表?
如何让FlinkSQL访问到阿里云MaxCompute上的表?
又或者是Confluent Schema Registry上那些带schema的kafka topic? 
需要自己定义Catalog吗?有相关的教程和资料么?谢谢!


Re:Can't load kafka-0.10 DynamicTableSourceFactory

2020-12-14 文章 hailongwang
Hi,
  Kafka010TableSourceSinkFactory 是表示使用 legacy property keys,
  Kafka010DynamicTableFactory 表示使用 new property keys[1]
从你的报错来看,是使用了 new 
property。所以需要在/META-INF/services/org.apache.flink.table.factories.Factory 
存在值:org.apache.flink.streaming.connectors.kafka.table.Kafka010DynamicTableFactory
并且 classload 里需要存在类 
org.apache.flink.streaming.connectors.kafka.table.Kafka010DynamicTableFactory


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory




Best,
Hailong
在 2020-12-14 19:41:06,"meijie.work"  写道:
>Hi, Community
>
>当我使用flink命令启动一个使用了kafka-0.10 source 表的任务报了如下错误。 我使用的flink版本是1.11.2
>
>> Could not find any factory for identifier 'kafka-0.10' that implements
>'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>classpath.
>
>我确定我的jar文件里有以下相关目录和文件。
>
>1. /META-INF/services/org.apache.flink.table.factories.TableFactory
>content: 
>org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
>
>2. [自定义] /META-INF/services/org.apache.flink.table.factories.Factory
>content: 
>com.voxlearning.xue.continues.sink.clickhouse.ClickHouseDynamicTableFactory
>
>3. org/apache/flink/streaming/connectors/kafka
>content: Kafka010TableSourceSinkFactory.class
>
>所以,我还忽略了什么排查点来确认问题呢? 如果需要任何额外信息请让我知道,谢谢。
>
>-- 
>
>*Best Regards*
>*Jeremy Mei*


Re:回复: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-14 文章 hailongwang
Hi,
   Window agg 使用可以参考[1],其中 first argument 可以是 Process time 或者 Eventime。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows


Best,
Hailong

在 2020-12-14 09:41:12,"guoliubi...@foxmail.com"  写道:
>TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP
>
>
>
>guoliubi...@foxmail.com
> 
>发件人: kandy.wang
>发送时间: 2020-12-14 10:28
>收件人: user-zh
>主题: Window aggregate can only be defined over a time attribute column, but 
>TIMESTAMP(3) encountered.
>[ERROR] Could not execute SQL statement. 
>Reason:org.apache.flink.table.api.TableException: Window aggregate can only be 
>defined over a time attribute column, but TIMESTAMP(3) encountered.
> 
> 
>SQL 如下:
>create temporary view expose as
> 
>select  
> 
>mid
> 
>,time_local
> 
>,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as 
>log_ts
> 
>,proctime
> 
>from hive.temp.kafka_table
> 
>;
>time_local 是bigint
> 
> 
> 
>select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv
> 
>from expose
> 
>group by TUMBLE(log_ts, INTERVAL '1' MINUTE);
> 
> 
>window agg的字段报错,如何解决。


如何让FlinkSQL访问到阿里云MaxCompute上的表?

2020-12-14 文章 陈帅
如何让FlinkSQL访问到阿里云MaxCompute上的表?
又或者是Confluent Schema Registry上那些带schema的kafka topic? 
需要自己定义Catalog吗?有相关的教程和资料么?谢谢!

Re: pyflink udf依赖引用问题

2020-12-14 文章 hxbks2ks
Hi,

你的图片挂了,可以找个图床工具贴上去,这里附上链接或者直接添上来文字内容。

Best,
Xingbo

magichuang  于2020年12月14日周一 下午8:41写道:

> 在使用pyflink  udf时,引用第三方依赖时出现了一下问题
>
>
> 如果直接通过  pythonapp.py运行是没有问题的,可以出结果,但是提交到集群上就不行了
>
> flink版本  1.11,是flink  onyarn集群部署的,通过  per-job模式提交任务,集群一共三台机器,在其中一台上提交的
>
> 下面是代码截图
>
> 麻烦给看一下,上面那个报错是因为什么呀?需要在其他两台机器上提前把cache_dir弄好吗?
> 我看日志是有尝试在hdfs上新创建一个临时文件夹来放置依赖tar.gz文件的
>
>
> 祝好~
>
>
>


Can't load kafka-0.10 DynamicTableSourceFactory

2020-12-14 文章 meijie.work
Hi, Community

当我使用flink命令启动一个使用了kafka-0.10 source 表的任务报了如下错误。 我使用的flink版本是1.11.2

> Could not find any factory for identifier 'kafka-0.10' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.

我确定我的jar文件里有以下相关目录和文件。

1. /META-INF/services/org.apache.flink.table.factories.TableFactory
content: 
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory

2. [自定义] /META-INF/services/org.apache.flink.table.factories.Factory
content: 
com.voxlearning.xue.continues.sink.clickhouse.ClickHouseDynamicTableFactory

3. org/apache/flink/streaming/connectors/kafka
content: Kafka010TableSourceSinkFactory.class

所以,我还忽略了什么排查点来确认问题呢? 如果需要任何额外信息请让我知道,谢谢。

-- 

*Best Regards*
*Jeremy Mei*


pyflink udf依赖引用问题

2020-12-14 文章 magichuang
在使用pyflink  udf时,引用第三方依赖时出现了一下问题




如果直接通过  pythonapp.py运行是没有问题的,可以出结果,但是提交到集群上就不行了

flink版本  1.11,是flink  onyarn集群部署的,通过  per-job模式提交任务,集群一共三台机器,在其中一台上提交的

下面是代码截图

麻烦给看一下,上面那个报错是因为什么呀?需要在其他两台机器上提前把cache_dir弄好吗?  
我看日志是有尝试在hdfs上新创建一个临时文件夹来放置依赖tar.gz文件的




祝好~




回复:Pandas UDF处理过的数据sink问题

2020-12-14 文章 郭刘斌
多谢。现在用逗号拼合字符串推mq,剩下交给后续程序自行处理了



---原始邮件---
发件人: "Xingbo Huang"https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations
[2] https://issues.apache.org/jira/browse/FLINK-20479
[3] https://issues.apache.org/jira/browse/FLINK-20507

Best,
Xingbo


Best,
Xingbo

guoliubi...@foxmail.com https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
 Best,
 Xingbo

 guoliubi...@foxmail.com https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
  Vectorized Python aggregate functions takes one or more pandas.Series 
as
  the inputs and return one scalar value as output.
  Note The return type does not support RowType and MapType for the time
  being.
  udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
  现在是后面用另一个udf把这个string再做拆分,代码大概如下:
  @udf(result_type=DataTypes.ROW(
  [DataTypes.FIELD('value1', 
DataTypes.BIGINT()),
  DataTypes.FIELD('value2', 
DataTypes.INT())]))
  def flattenStr(inputStr):
  ret_array = [int(x) for x in 
inputStr.split(',')]
  return Row(ret_array[0], ret_array[1])
  t_env.create_temporary_function("flattenStr", 
flattenStr)aggregate_table
 =
  order_table.window(tumble_window) \
  .group_by("w") \
  .select("**调用udaf** as aggValue")
  result_table = aggregate_table.select("flattenStr(aggValue) as 
retValue")
 
 
 
result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
  Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
  at
 
 
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
  at
 
 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
  at
 
 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
  at
 
 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
 
 
  guoliubi...@foxmail.com
 
  发件人: Wei Zhong
  发送时间: 2020-12-14 10:38
  收件人: user-zh
  主题: Re: Pandas UDF处理过的数据sink问题
  Hi Lucas,
 
  是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
 
  你可以尝试将sql语句改成以下形式:
 
  select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
  from `some_source`
  group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
 
  此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
 
  Best,
  Wei
 
   在 2020年12月13日,13:13,Lucas 

Re: Re: Pandas UDF处理过的数据sink问题

2020-12-14 文章 Xingbo Huang
Hi,

join
udtf和你认为的两个table的join是不一样的,只是因为udtf会返回多条结果,需要左边的一条输入和多条的udtf输出拼接在一起,所以用join。对于udf只会返回一条输出,所以是一对一的拼接。如果你udtf只返回一条结果,这个拼接和udf就是类似的。udtf是能直接扩展列的,而udf,
udaf都没法直接扩展列的,除非你能使用row-based的那套operation[1],不过这个feature在1.13
PyFlink才会支持[2]。

当然了,你可以按照weizhong的方式,一个udaf,直接返回一个Row类型的数据,然后再去get(0),get(1)的方式去拿也可以,不过在1.12只有普通的Python
UDAF是支持返回一个Row类型的,Pandas
UDAF没法支持你返回一个Row类型的结果,不过这个feature在master(1.13)上已经支持了[3]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations
[2] https://issues.apache.org/jira/browse/FLINK-20479
[3] https://issues.apache.org/jira/browse/FLINK-20507

Best,
Xingbo


Best,
Xingbo

guoliubi...@foxmail.com  于2020年12月14日周一 下午2:29写道:

> Hi xingbo,
> 文档中给的例子udtf需要和join一起使用,但是我现在不需要join,只是单纯的转换结果
> 如果直接调用了udtf后sink,会提示
> Cause: Different number of columns.
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
>
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
>
>
> guoliubi...@foxmail.com
>
> 发件人: Xingbo Huang
> 发送时间: 2020-12-14 11:38
> 收件人: user-zh
> 主题: Re: Re: Pandas UDF处理过的数据sink问题
> Hi,
> 你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
> Best,
> Xingbo
>
> guoliubi...@foxmail.com  于2020年12月14日周一
> 上午11:00写道:
>
> > 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> > 但现在有另一个问题,根据文档
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> > Vectorized Python aggregate functions takes one or more pandas.Series as
> > the inputs and return one scalar value as output.
> > Note The return type does not support RowType and MapType for the time
> > being.
> > udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> > 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> > @udf(result_type=DataTypes.ROW(
> > [DataTypes.FIELD('value1', DataTypes.BIGINT()),
> >  DataTypes.FIELD('value2', DataTypes.INT())]))
> > def flattenStr(inputStr):
> > ret_array = [int(x) for x in inputStr.split(',')]
> > return Row(ret_array[0], ret_array[1])
> > t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table
> =
> > order_table.window(tumble_window) \
> > .group_by("w") \
> > .select("**调用udaf** as aggValue")
> > result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
> >
> >
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
> > at
> >
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
> > at
> >
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
> > at
> >
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
> > at
> >
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
> >
> >
> > guoliubi...@foxmail.com
> >
> > 发件人: Wei Zhong
> > 发送时间: 2020-12-14 10:38
> > 收件人: user-zh
> > 主题: Re: Pandas UDF处理过的数据sink问题
> > Hi Lucas,
> >
> > 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
> >
> > 你可以尝试将sql语句改成以下形式:
> >
> > select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> >
> > 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
> >
> > Best,
> > Wei
> >
> > > 在 2020年12月13日,13:13,Lucas  写道:
> > >
> > > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> > >
> > > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> > > result_type=DataTypes.ROW(
> > > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> > >  DataTypes.FIELD('aveBuy', DataTypes.INT())),
> > > func_type='pandas')
> > > def orderCalc(code, amount):
> > >
> > >df = pd.DataFrame({'code': code, 'amount': amount})
> > > # pandas 数据处理后输入另一个dataframe output
> > > return (output['buyQtl'], output['aveBuy'])
> > >
> > >
> > > 定义了csv的sink如下
> > >
> > > create table csvSink (
> > >buyQtl BIGINT,
> > >aveBuy INT
> > > ) with (
> > >'connector.type' = 'filesystem',
> > >'format.type' = 'csv',
> > >'connector.path' = 'e:/output'
> > > )
> > >
> > >
> > >
> > > 然后进行如下的操作:
> > >
> > > result_table = t_env.sql_query("""
> > > select orderCalc(code, amount)
> > > from `some_source`
> > > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> > > """)
> > > result_table.execute_insert("csvSink")
> > >
> > >
> > >
> > > 在执行程序的时候提示没法入库
> > >
> > > py4j.protocol.Py4JJavaError: An error occurred while calling
> > > o98.executeInsert.
> > >
> > 

Re:Re: 关于flink-sql 元数据问题

2020-12-14 文章 夜思流年梦






Hi ,
 的确tableEnv.execute 和tableEnv.executeSql 这两个方法不该一起用
 现在会报另一个错,去掉tableEnv.execute 方法,
代码如下:


final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_RESTART, 
Time.of(DURING_RESTART, TimeUnit.SECONDS)));
env.enableCheckpointing(CHECKPOINT_INTERVAL);
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env,bsSettings);
Catalog catalog = new 
HiveCatalog(CATALOG_NAME,DEFAULTDATABASE,HIVECONFDIR);
tableEnv.registerCatalog(CATALOG_NAME,catalog);
tableEnv.useCatalog("myhive");
tableEnv.executeSql("create table .");


No jobs included in application


目前的场景是想把flink-sql 建表的操作规范到我们自己的平台上,不想通过SQL-client 
或者代码的方式提交建表,这样我们自己可以做一些类似于建表规范,元数据统一管理等一些功能;用户通过在平台上建表,然后调用flink的api来实现建表操作; 
现在已经使用的是Hive-catalog,只是我们关注的是建表操作放到我们自己的平台上;


所以想采用上述代码的方式,通过平台调用,直接建表,但是现在这个应用提交是不成功的(要么报没有算子,要么报没有Jobs);
然后就算上述方式提交成功,好像也没法知道我的这个表建成功与否,只知道应用提交成功没有,不像HTTP请求有对应返回值,好像没有类似的 Rest 
Api的方式来做这个事情













在 2020-12-14 11:42:35,"Rui Li"  写道:
>Hi,
>
>调用tableEnv.executeSql("create table
>.")以后表就已经创建了,不需要再调用tableEnv.execute。execute方法已经deprecate,建议统一使用executeSql哈
>
>On Fri, Dec 11, 2020 at 7:23 PM JasonLee <17610775...@163.com> wrote:
>
>> hi
>> Flink SQL 建的表支持用 hive 的 catalog 来管理元数据,是否可以满足你的需求 ?
>>
>>
>>
>> -
>> Best Wishes
>> JasonLee
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
>
>-- 
>Best regards!
>Rui Li


Flink1.11.1版本Application Mode job on K8S集群,too old resource version问题

2020-12-14 文章 lichunguang
Flink1.11.1版本job以Application Mode在K8S集群上运行,jobmanager每个小时会重启一次,报错【Fatal error
occurred in
ResourceManager.io.fabric8.kubernetes.client.KubernetesClientException: too
old resource version】

pod重启:
 

重启原因:
2020-12-10 07:21:19,290 ERROR
org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal
error occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 247468999 (248117930)
  at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2020-12-10 07:21:19,291 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
error occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 247468999 (248117930)
  at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]


网上查的原因是因为:
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient类中212行

@Override
public KubernetesWatch watchPodsAndDoCallback(Map labels,
PodCallbackHandler podCallbackHandler) {
return new KubernetesWatch(
this.internalClient.pods()
.withLabels(labels)
.watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
}

而ETCD中只会保留一段时间的version信息
【 I think it's standard behavior of Kubernetes to give 410 after some time
during watch. It's usually client's responsibility to handle it. In the
context of a watch, it will return HTTP_GONE when you ask to see changes for
a resourceVersion that is too old - i.e. when it can no longer tell you what
has changed since that version, since too many things have changed. In that
case, you'll need to start again, by not specifying a resourceVersion in
which case the watch will send you the current state of the thing you are
watching and then send updates from that point.】

大家有没遇到相同的问题,是怎么处理的?我有几个处理方式,希望能跟大家一起讨论一下。




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Application Mode job on K8S集群,无法缩容问题

2020-12-14 文章 lichunguang
Flink1.11.1版本job以Application
Mode在K8S集群上运行,TaskManager都是申请的单独pod,分配在不同的node上,导致资源使用较低时,K8S集群无法进行自动缩容操作,浪费资源。

1、TaskManager申请的单独pod逻辑
org.apache.flink.kubernetes.KubernetesResourceManager.requestKubernetesPod()

2、K8S集群自动缩容条件,如AWS集群
 

3、资源使用低的节点无法调度
 


大家有关注这个问题吗?可以讨论下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink-SQL数据倾斜处理

2020-12-14 文章 yanggang_it_job
hello,通过FlinkSQL实现了一个简单的业务:Kafka to hive
但是任务不定期报错,某个TM异常挂掉,经排查可以得到如下日志
Direct buffer memory. The direct out-of-memory error has occurred. This can 
mean two things: either job(s) require(s) a larger size of JVM direct mOpt>  or 
there is a direct memory leak. The direct memory can be allocated by user code 
or some of its dependencies. In this case 
'taskmanager.memory.task.off-heap.size' configuration option should be 
increased. Flink framework and its dependencies also consume the direct memory, 
mostly for network communication. The most of network memory is managed by 
Flink and should not result in out-of-memory error. In certain special cases, 
in particular for jobs with high parallelism, the framework may require more 
direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies which has to be investigated and fixed. 
The task executor has to be shutdown...


可以通过两个参数进行调节,但是感觉这不是根本原因,现在怀疑是数据倾斜导致,为什么会任务是数据倾斜呢?,请看下图:
1.对内存使用曲线:

可以得出每个TM的堆内存(HeapMemory)使用相差很大。


2.直接内存曲线图

可以得出每个TM的直接内存(DirectMemory)使用相差很大。

问1:如果是数据倾斜导致的异常,那么在FlinkSQL场景中怎么处理?
问2:如果不是数据倾斜导致的,那么是什么原因导致的?解决方案是什么?

Best to you !!!

Re: flink-cdc-connector 使用场景和限制是什么?

2020-12-14 文章 Chesnay Schepler

Moving to chinese user mailing ist.

On 12/14/2020 3:19 AM, 陈帅 wrote:
传统CDC方式是通过 mysql -> debezium -> kafka, 
这样便于DBA管控资源,因为像postgres库需要创建slot资源,但如果像 
flink-cdc-connector 
每(几)张表就创建一个CDC流的话对数据库的资源要求很高,而且不可控。所以我的理解flink-cdc-connector更适合少量的即席cdc处理,而不是大规模的cdc处理,不知我的理解对不对?