hive表已创建的情况下从checkpoint恢复作业

2021-01-05 Thread amen...@163.com
hi everyone,

flink version: 1.12.0
job dag: kafka ->hive

今天碰到一个问题,我在第一次启动作业的时候,通过hivecatalog成功在hive中创建hive 
table,并正常入数,正常做checkpoint,但由于kafka含有脏数据,导致作业在重启3次仍无法跳过脏数据后变为Failed状态,于是修改作业kafka配置,开启可跳过解析异常行参数,再通过-s
 
hdfs:///xxx/checkpoints/chk-122去从checkpoint恢复作业时,首先报出来的异常是以前的kafka和hive表已经在指定catalog.database中创建,也确实是如此,但是我的疑问是:

1.任务失败后我调整作业重新从最近的chk恢复上线作业,按理来说chk state应该记录了我的表创建信息,从而不会再重新在hive中建表,但显然并没有如此
2.从错误情况来看,是先创建表,再从chk中恢复作业状态,那hive表已创建的异常 Caused by: 
AlreadyExistsException(message:Table kafka_source_table already exists) 该如何避免呢?

best,
amenhub





回复: Flink SQL DDL Schema csv嵌套json

2021-01-03 Thread amen...@163.com
Flink版本 1.12.0



 
发件人: amen...@163.com
发送时间: 2021-01-03 16:09
收件人: user-zh
主题: Flink SQL DDL Schema csv嵌套json
hi everyone,
 
zhangsan|man|28|goodst...@gmail.com|{"math":98, "language":{"english":89, 
"french":95}}|china|beijing
 
这是一条来自kafka消息队列中的数据,当我创建kafka ddl为之定义schema时,报出异常信息:
Exception in thread "main" java.lang.IllegalArgumentException: Only simple 
types are supported in the second level nesting of fields 'alex_1' but was: 
ROW<`english` INT, `french` INT>
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.validateNestedField(CsvRowSchemaConverter.java:220)
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.convertType(CsvRowSchemaConverter.java:197)
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.convert(CsvRowSchemaConverter.java:145)
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema$Builder.(CsvRowDataDeserializationSchema.java:98)
at 
org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:79)
at 
org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:71)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:401)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:184)
at 
org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:262)
at 
org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:73)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101)
...
...
 
DDL语句:
CREATE TABLE kafka_source (
name STRING,
sex STRING,
age INT,
mail STRING,
alex_1 ROW>,
country STRING,
city STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic5-r1p3',
'properties.bootstrap.servers' = '10.1.128.63:9092',
'properties.group.id' = 'group5',
'format' = 'csv',
'csv.field-delimiter' = '|',
'scan.startup.mode' = 'group-offsets'
)
 
看起来似乎是ROW嵌套的问题,但是当把format改成json,却没有问题,想请问在csv中嵌套这样的json数据,schema该如何定义?
 
best,
amenhub
 
 


Flink SQL DDL Schema csv嵌套json

2021-01-03 Thread amen...@163.com
hi everyone,

zhangsan|man|28|goodst...@gmail.com|{"math":98, "language":{"english":89, 
"french":95}}|china|beijing

这是一条来自kafka消息队列中的数据,当我创建kafka ddl为之定义schema时,报出异常信息:
Exception in thread "main" java.lang.IllegalArgumentException: Only simple 
types are supported in the second level nesting of fields 'alex_1' but was: 
ROW<`english` INT, `french` INT>
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.validateNestedField(CsvRowSchemaConverter.java:220)
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.convertType(CsvRowSchemaConverter.java:197)
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.convert(CsvRowSchemaConverter.java:145)
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema$Builder.(CsvRowDataDeserializationSchema.java:98)
at 
org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:79)
at 
org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:71)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:401)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:184)
at 
org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:262)
at 
org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:73)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101)
...
...

DDL语句:
CREATE TABLE kafka_source (
name STRING,
sex STRING,
age INT,
mail STRING,
alex_1 ROW>,
country STRING,
city STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic5-r1p3',
'properties.bootstrap.servers' = '10.1.128.63:9092',
'properties.group.id' = 'group5',
'format' = 'csv',
'csv.field-delimiter' = '|',
'scan.startup.mode' = 'group-offsets'
)

看起来似乎是ROW嵌套的问题,但是当把format改成json,却没有问题,想请问在csv中嵌套这样的json数据,schema该如何定义?

best,
amenhub




Re: Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-24 Thread amen...@163.com
想请问下,写filesystem的时候依赖checkpoint进行commit,那么做完一次checkpoint的时候可提交的文件数是由并行度parallelism数决定的吗?我发现我的文件提交数都是3个3个的当每次chk结束后。



 
发件人: amen...@163.com
发送时间: 2020-12-24 18:47
收件人: user-zh
主题: Re: Re: Flink-1.11.1流写filesystem分区提交问题
一语点醒梦中人,谢谢回复@冯嘉伟
 
因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢
 
best,
amenhub
 
 
 
发件人: 冯嘉伟
发送时间: 2020-12-24 18:39
收件人: user-zh
主题: Re: Flink-1.11.1流写filesystem分区提交问题
有开启checkpoint吗?
Part files can be in one of three states:
In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html>
  
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-24 Thread amen...@163.com
一语点醒梦中人,谢谢回复@冯嘉伟

因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢

best,
amenhub



 
发件人: 冯嘉伟
发送时间: 2020-12-24 18:39
收件人: user-zh
主题: Re: Flink-1.11.1流写filesystem分区提交问题
有开启checkpoint吗?
 
Part files can be in one of three states:
 
In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”
 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html

  
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-24 Thread amen...@163.com
完了,现在的问题是发现好像所有的分区都没有提交,一直不提交,这是为什么呢?



 
发件人: amen...@163.com
发送时间: 2020-12-24 17:04
收件人: user-zh
主题: Flink-1.11.1流写filesystem分区提交问题
hi everyone,
 
最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。
 
问题描述:
在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧)
 
那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交..
 
另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。
 
描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢!
 
source ddl:
CREATE TABLE kafka_source (
order_id STRING,
order_sales DOUBLE,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'flink-kafka',
'properties.bootstrap.servers' = '10.3.15.128:9092',
'properties.group.id' = 'kafka_hdfs',
'format' = 'json',
'scan.startup.mode' = 'group-offsets'
)
sink ddl:
CREATE TABLE hdfs_sink (
order_id STRING,
order_sales DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///user/flink/order',
'format' = 'json',
'sink.partition-commit.delay' = '1h',
'sink.partition-commit.policy.kind' = 'success-file'
)
transform dml:
INSERT INTO hdfs_sink 
SELECT 
order_id,
order_sales,
DATE_FORMAT(update_time, '-MM-dd'),
DATE_FORMAT(update_time, 'HH')
FROM kafka_source
 
best,
amenhub
 
 


Flink-1.11.1流写filesystem分区提交问题

2020-12-24 Thread amen...@163.com
hi everyone,

最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。

问题描述:
在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧)

那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交..

另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。

描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢!

source ddl:
CREATE TABLE kafka_source (
order_id STRING,
order_sales DOUBLE,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'flink-kafka',
'properties.bootstrap.servers' = '10.3.15.128:9092',
'properties.group.id' = 'kafka_hdfs',
'format' = 'json',
'scan.startup.mode' = 'group-offsets'
)
sink ddl:
CREATE TABLE hdfs_sink (
order_id STRING,
order_sales DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///user/flink/order',
'format' = 'json',
'sink.partition-commit.delay' = '1h',
'sink.partition-commit.policy.kind' = 'success-file'
)
transform dml:
INSERT INTO hdfs_sink 
SELECT 
order_id,
order_sales,
DATE_FORMAT(update_time, '-MM-dd'),
DATE_FORMAT(update_time, 'HH')
FROM kafka_source

best,
amenhub




Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 Thread amen...@163.com
这个问题应该问yarn吧。。。



 
发件人: yujianbo
发送时间: 2020-12-21 16:43
收件人: user-zh
主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 Thread amen...@163.com
今天又博学了,谢谢!



 
发件人: Leonard Xu
发送时间: 2020-12-21 15:01
收件人: user-zh
主题: Re: Flink-1.12支持kafka join jdbc维表吗
>  
> 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?
 
是的
 
 
> 你说的这种方式就是好像基于处理时间的join~
是的,基于处理时间的维表join和大家熟知的lookup关联, 
语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 
关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。
 
祝好
Leonard
 
 
 
>   
> 
> 
> 
> 发件人: Leonard Xu
> 发送时间: 2020-12-21 14:44
> 收件人: user-zh
> 主题: Re: Flink-1.12支持kafka join jdbc维表吗
> Hi 
>> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
> 
> 这是正常的,jdbc 
> connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
> 
> 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
> temporal join changelog流 实现关联维表的准确版本。
> 
> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates 
> FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
> 'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
> 
> 
> 祝好,
> Leonard 
> 
 


回复: Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 Thread amen...@163.com
感谢@Leonard Xu 的回复,

这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?

>>> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates 
>>> FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
>>> 'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
你说的这种方式就是好像基于处理时间的join~

best,
amenhub


 
发件人: Leonard Xu
发送时间: 2020-12-21 14:44
收件人: user-zh
主题: Re: Flink-1.12支持kafka join jdbc维表吗
Hi 
> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
 
这是正常的,jdbc 
connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
 
如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
temporal join changelog流 实现关联维表的准确版本。
 
另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR 
SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
 
 
祝好,
Leonard 
 


Flink-1.12支持kafka join jdbc维表吗

2020-12-20 Thread amen...@163.com
hi,

请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗?

在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀?

CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (
'connector' = 'kafka',
'topic' = 'topic_flink',
'properties.bootstrap.servers' = '10.3.12.113:9092',
'properties.group.id' = 'flink',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)

CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 
'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true=utf8=PRC=false'
'username' = 'root',
'password' = 'root1234',
'table-name' = 'latest_rates',
'lookup.cache.max-rows' = '1',
'lookup.cache.ttl' = '1min'
)

SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o
LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency"

best,
amenhub







使用stop命令停止yarn队列任务携带Kerberos认证信息

2020-12-06 Thread amen...@163.com
hi everyone,

使用./bin/flink stop -yid application_xxx_xxx 
xx命令停止yarn任务时,可否将kerberos认证信息带过去?
例如说,-Dsecurity.kerberos.login.keytab 及 
-Dsecurity.kerberos.login.principal这样(这二者试过不行 )

btw, 代码形式的stop和命令行的stop命令使用效率是一致的吗?thanks.

best,
amenhub





Re: flink使用多个keytab

2020-12-03 Thread amen...@163.com
hi,

可以在调用executeSql()或execute()(多SQL)的时候使用ugi认证进行任务提交,以及在run命令中动态传入你所说的两个参数,以-yD的形式。

best,
amenhub



 
发件人: zhengmao776
发送时间: 2020-12-03 17:16
收件人: user-zh
主题: flink使用多个keytab
你好,我在使用flink
run提交任务时,由于集群是kerberos化的Hadoop集群,我想为不同的用户提供不同的keytab进行认证,但是我在flink-conf.yaml中看到了的security.kerberos.login.keytab和security.kerberos.login.principal的相关配置,但是这不能动态配置;我尝试了使用
-yD进行配置,但是并不起作用,我想知道如何处理这样的情况?期待您的回复~~~
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件

newxmmxszc44.qq.com rejected your message to the following email addresses:
m...@zhangzuofeng.cn
Your message couldn't be delivered because the recipient's email system wasn't 
able to confirm that your message came from a trusted location.
For Email Administrators
This error is related to the Sender Policy Framework (SPF). The destination 
email system's evaluation of the SPF record for the message resulted in an 
error. Please work with your domain registrar to ensure your SPF records are 
correctly configured.


再回到问题上来吧,我启动的时候是没有加-d参数的,启动之后execution.attached的值是true,execution.target值是yarn-per-job

best,
amenhub



 
发件人: amen...@163.com
发送时间: 2020-11-13 11:30
收件人: user-zh
主题: Re: Re: Flink与Yarn的状态一致性问题
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job 
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job 
Manager -> logs查看失败日志内容)
 
best,
amenhub
 
 
 
发件人: JasonLee
发送时间: 2020-11-13 11:22
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件

newxmmxszc44.qq.com rejected your message to the following email addresses:
m...@zhangzuofeng.cn
Your message couldn't be delivered because the recipient's email system wasn't 
able to confirm that your message came from a trusted location.
For Email Administrators
This error is related to the Sender Policy Framework (SPF). The destination 
email system's evaluation of the SPF record for the message resulted in an 
error. Please work with your domain registrar to ensure your SPF records are 
correctly configured.


再回到问题上来吧,我启动的时候是没有加-d参数的,启动之后execution.attached的值是true,execution.target值是yarn-per-job

best,
amenhub



 
发件人: amen...@163.com
发送时间: 2020-11-13 11:30
收件人: user-zh
主题: Re: Re: Flink与Yarn的状态一致性问题
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job 
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job 
Manager -> logs查看失败日志内容)
 
best,
amenhub
 
 
 
发件人: JasonLee
发送时间: 2020-11-13 11:22
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job 
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job 
Manager -> logs查看失败日志内容)

best,
amenhub



 
发件人: JasonLee
发送时间: 2020-11-13 11:22
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
 
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job 
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job 
Manager -> logs查看失败日志内容)

best,
amenhub



 
发件人: JasonLee
发送时间: 2020-11-13 11:22
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
 
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread amen...@163.com
>>>当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
按照这个说法,应当是偶发性行为,然而我一直等待Flink上报,大概几个小时过去了Yarn状态仍然处于Running..

>>>你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
这个话没有看懂,我的提交方式是./bin/flink run -m yarn-cluster xxx,Flink版本是1.11.1

昨天在社区邮件里发现了Flink-1.10以前可以通过-d参数解决Per-job模式下Flink web 
ui状态为Failed的时候,实时反馈Failed状态给Yarn,从而解决Yarn仍为Running的问题,
也提到说Flink-1.10及以后的Per-job模式是YarnJobClusterEntrypoint,这个确实没错,但是我面临的问题仍然和Flink-1.10以前的问题一致,
就是Flink web ui观察任务已经Fail掉了,但Yarn application仍然在Running

另外,发现Flink web ui观察任务如果是Finished的话,也会处于Running,这个算属于正常吗?(以上描述的作业都是Streaming job)

best,
amenhub


 
发件人: tison
发送时间: 2020-11-13 11:01
收件人: user-zh
主题: Re: Flink与Yarn的状态一致性问题
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。
 
当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
 
你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
 
Best,
tison.
 
 
zhisheng  于2020年11月12日周四 下午8:17写道:
 
> 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
>
> hdxg1101300123  于2020年11月12日周四 下午8:07写道:
>
> > 可以设置检查点失败任务也失败
> >
> >
> >
> > 发自vivo智能手机
> > > hi everyone,
> > >
> > > 最近在使用Flink-1.11.1 On Yarn Per
> > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> > application仍处于运行状态
> > >
> > > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
> > >
> > > best,
> > > amenhub
>


Re: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-11-12 Thread amen...@163.com
hi,

我现在的版本是flink-1.11.1没有加-d参数,也遇见了同样的问题,不知道是什么情况呢?

best,
amenhub



 
发件人: Yang Wang
发送时间: 2020-08-05 10:28
收件人: user-zh
主题: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running
你的Flink任务应该是用attach的方式起的,也就是没有加-d,这种情况在1.10之前起的任务本质上是一个session,
只有当结果被client端retrieve走以后,才会退出,如果client挂了或者你主动停掉了,那就会留下一个空的session
 
你可以通过如下log确认起的session模式
 
2020-08-04 10:45:36,868 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Starting YarnSessionClusterEntrypoint (Version: 1.9.1, Rev:f23f82a,
Date:01.11.2019 @ 11:20:33 CST)
 
 
你可以flink run -d ...就是perjob模式了,或者升级到1.10及以后版本attach/detach都是真正的perjob
 
 
Best,
Yang
 
bradyMk  于2020年8月4日周二 下午8:04写道:
 
> 您好:
> 请问这是flink这个版本自身的bug么?那就意味着没有办法解决了吧,只能手动kill掉?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Flink与Yarn的状态一致性问题

2020-11-11 Thread amen...@163.com
hi everyone,


最近在使用Flink-1.11.1 On Yarn Per 
Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn 
application仍处于运行状态


疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢


best,
amenhub

Flink SQL传递性

2020-11-09 Thread amen...@163.com
hi everyone,

Flink SQL有没有上一个SQL的输出是下一个SQL的输入的业务场景思路?
比如说KafkaSource -> SQL_1 -> SQL_2 -> MysqlSink,一整个链起来,作为一个任务提交运行~

best,
amenhub


Flink与Yarn的状态一致性问题

2020-11-09 Thread amen...@163.com
hi everyone,

最近在使用Flink-1.11.1 On Yarn Per 
Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn 
application仍处于运行状态

疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢

best,
amenhub


flink sql读写带kerberos认证的kafka问题请教

2020-11-05 Thread amen...@163.com
hi everyone,

想请问下社区及各位大神们,通过Flink Table 
API连接带有kerberos认证的kafka时,怎么做的kafka集群中topic和group权限认证的?

best,
amenhub





yarn部署模式kerberos问题

2020-11-03 Thread amen...@163.com
hi everyone,

最近使用flink-1.11.1在通过per-job方式提交任务到yarn队列的时候,碰到了kerberos权限认证问题。

具体描述:在客户端通过Kerberos权限认证指定用户,提交flink任务到yarn队列,正常提交,但是当任务被yarn分配到指定节点进行执行时,根据报错信息来看,是因为需要操作hdfs(创建检查点目录和保存点目录,因为我使用FileSystem
 StateBackend)而并没有获得操作hdfs的权限,被kerberos常规的拦截了。

所以我通过查找社区邮件了解到,使用-yD参数可以避免这个问题,但是理论上来说在客户端通过认证并成功提交到yarn之后,无论是任务提交节点还是任务执行节点,权限都应该互通吗?

这里的-yD security.kerberos.login.principal=xxx -yD 
security.kerberos.login.keytab=xxx是纯粹为了解决这类问题而使用的吗?帮忙解惑~

best,
amenhub


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 Thread amen...@163.com
是的,正如@chenxuying 和@zhisheng 所说,

我这边采用的方案是通过pipeline.classpaths参数将需要的udf jar添加到类路径中,但是当task被分配到tm去执行时仍需要找到所需udf 
jar才行,所以在1.11版本中我采用-yt参数将/plugins插件目录上传至hdfs,即可解决这个问题~

best,
amenhub



 
发件人: zhisheng
发送时间: 2020-10-22 23:28
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
hi
 
flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持
 
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs
 
Best
zhisheng
 
Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:
 
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: Flink-1.11.1 Rest API使用

2020-10-22 Thread amen...@163.com
还真是不支持,多谢解惑~



 
发件人: Peidian Li
发送时间: 2020-10-22 19:13
收件人: user-zh
主题: Re: Flink-1.11.1 Rest API使用
Yarn 的proxy server不支持POST请求,这是前几天yarn同事给我截的图:
我们改了下proxy server的逻辑来支持POST请求就可以了


一个stop with savepoint例子:
http://zjy-hadoop-prc-ct11.bj:21001/proxy/application_1600936402499_375893/jobs/790e4740baa52b43c0ceb9a5cdaf6135/stop?proxyapproved=true

Request body:
{
"drain" : true,
"targetDirectory" : "hdfs://zjyprc-hadoop/user/s_flink_tst/checkpoints4"
}

Response:
{
"request-id": "69416efc4538f56759f77a3001c38ff8"
}

2020年10月22日 下午2:30,Husky Zeng <568793...@qq.com> 写道:

其他接口大多不是post类型,你要修改成get或者其他的。可以先仔细阅读一下你发的这个页面上面的介绍,看看部署有没有出错。



--
Sent from: http://apache-flink.147419.n8.nabble.com/



Flink-1.11.1 Rest API使用

2020-10-21 Thread amen...@163.com
hi everyone,

如[1]所示的Rest API该怎么进行参数设置并在postman中测试呢?疑惑的地方在于其中的Query 
Parameter参数例如mode该怎么传给URL?如果有例子说明,感激不尽

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/rest_api.html#jobs-jobid-1

best,
amenhub


回复: 回复: flink 自定义udf注册后不能使用

2020-10-16 Thread amen...@163.com
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub
 
发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试
 
CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;
 
我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以
 

发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用
 
是的,是我传参有问题
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 Thread amen...@163.com
追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
那这种设置env的方式有可能还会造成其他什么问题?

best,
amenhub
 
发件人: amen...@163.com
发送时间: 2020-10-15 19:22
收件人: user-zh
主题: Re: Re: flink1.11加载外部jar包进行UDF注册
非常感谢您的回复!
 
对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
 
期待您的回复,谢谢~
 
best, 
amenhub
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar;;
loadJar(new URL(path));
Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 Thread amen...@163.com
非常感谢您的回复!

对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。

期待您的回复,谢谢~

best, 
amenhub
 
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar;;
loadJar(new URL(path));
 
Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
 
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
 
tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11加载外部jar包进行UDF注册

2020-10-13 Thread amen...@163.com
hi, everyone

近期有做一个关于从外部路径加载UDF的开发,但报了如下异常:(截取主要的异常信息)

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.xxx.xxx.udf.Uppercase
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-1318a525-1b5a-4c07-808e-f62083c3fb11/job_a5501605ff554915a81ae12e3018e77d/blob_p-b0411adc6fb3d602ed03076ddc3d1bf3e6a63319-48d1e8f3c1b25d4e2b78242429834e31'
 (valid JAR)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassNotFoundException: com.xxx.xxx.udf.Uppercase
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_171]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_171]

我这边猜测的原因是进行外部jar包加载进行createTemporarySystemFunction的时候,在flink运行环境中没有将外部jar加载进来,但对这个猜测也还是很迷惑,菜鸟操作,希望有大佬帮忙提供解决方案或思路,谢谢~

best,
amenhub



Re: 回复: flink1.11连接mysql问题

2020-08-31 Thread amen...@163.com
如果是mysql5.x以上的版本,url中autoReconnect参数会无效吧,

可以尝试下修改配置文件wait_timeout/interactive_out参数

best,
amenhub
 
发件人: 酷酷的浑蛋
发送时间: 2020-08-31 20:48
收件人: user-zh@flink.apache.org
主题: 回复: flink1.11连接mysql问题
 
 
下面是我连接mysql的配置,用的flink-1.11.1,还是报那个错误
CREATE TABLE xx(
  `xx` varchar,
  `xx` varchar
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx/xx?autoReconnect=true=false',
'table-name' = ‘xx',
'driver' = 'com.mysql.jdbc.Driver',
'username' = ‘xx',
'password' = ‘xx',
'scan.partition.column' = 'id',
'scan.partition.num' = '50',
'scan.partition.lower-bound' = '500',
'scan.partition.upper-bound' = '1000',
'scan.fetch-size' = '100',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '10s'
);
在2020年08月31日 17:33,Leonard Xu 写道:
 
 
在 2020年8月28日,15:02,酷酷的浑蛋  写道:
 
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem.
 
 
 
 
flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用
 
 
 
Hi
 
超时断开问题在1.11应该已经修复[1],你是怎么使用的?可以提供更多的信息吗
 
Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16681 

 


Re: flink-1.11.1 Table API /SQL 无法写入hive orc表

2020-08-31 Thread amen...@163.com
hi Jian Wang,

根据我的理解,在flink 
lib目录下导入官方的flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar是指hive[2.0.0-2.2.0]版本都可以使用此依赖。

关于你的问题我曾经遇到过,hive也是2.1.1,我的demo参考[1]可以运行成功,而不需要额外导入flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar,
只需要把[1]中的依赖改成provided并把其jar包导入flink/lib即可。

希望能帮到你,

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#program-maven

best,
amenhub

 
发件人: Jian Wang
发送时间: 2020-08-31 21:55
收件人: user-zh
主题: flink-1.11.1 Table API /SQL 无法写入hive orc表
Hi all,
 
我基于flink 1.11 + hadoop 3.0.0 + hive 2.1.1 , flink on yarn模式,在streaming 
job上的Table API上执行flink sql实时写入hive表。
 
根据文档 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 
 
去配置,现在遇到flink和hive的依赖问题。
 
 
在项目内的pom上,所有相关hive相关依赖都是provided,在flink 
lib下放进flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar, 
提交任务的时候,会出现hive-exec.jar的冲突导致java.lang.NoClassDefFoundError: Could not 
initialize class org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 
(因为我的hive是2.1.1版本,flink没有提供flink-sql-connector-hive的2.1.1版本,所以我用的和它最近的2.2.0)。  
 
我又尝试了根据我的hive版本2.1.1, 
去根据flink源码,把hive-exec改成2.1.1去手动打包flink-sql-connector-hive-2.1.1_2.11-1.11.1.jar放到flink
 lib下, 
但是发现flink-sql-connector-hive里面的orc-core-1.4.3和hive-exec-2.1.1冲突,java.lang.NoSuchMethodError:
 
org.apache.orc.TypeDescription.fromString(Ljava/lang/String;)Lorg/apache/orc/TypeDescription;
 
我看到在flink提供的flink-sql-connector-hive中确实没有2.1.1的版本,是这个版本无法和flink兼容吗?或 有flink 
1.11和hive 2.1.1集成成功的范例参考吗?
 
谢谢
 
 
王剑 


Flink-1.11.1 Application-Mode提交测试

2020-08-25 Thread amen...@163.com
hi, everyone

当我把jar包都上传至hdfs时,使用如下命令进行application mode提交,

./bin/flink run-application -t yarn-application 
-Dyarn.provided.lib.dirs="hdfs:///user/flink/lib" -c 
com.yui.flink.demo.Kafka2Mysql hdfs:///user/flink/app_jars/kafka2mysql.jar

报异常如下:

 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn Application Cluster
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:414)
at 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)
at 
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1598223665550_0009 failed 1 
times (global limit =2; local limit is =1) due to AM Container for 
appattempt_1598223665550_0009_01 exited with  exitCode: -1
Failing this attempt.Diagnostics: [2020-08-25 15:12:48.975]Destination must be 
relative
For more detailed output, check the application tracking page: 
http://ck233:8088/cluster/app/application_1598223665550_0009 Then click on 
links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1598223665550_0009
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1021)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:407)
... 9 more

其他没有任何的错误了,使用run -m yarn-cluster是可以正常提交的

best,
amenhub


Re: Re: hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread amen...@163.com
好的谢谢回复,

在指定hive版本为2.1.1时,我选择了在程序中导入hive-exec-2.1.1、flink-connector-hive_2.11-1.11.1依赖,可正常操作hive
 table;

best,
amenhub

 
发件人: Rui Li
发送时间: 2020-08-24 21:33
收件人: user-zh
主题: Re: hive-exec依赖导致hadoop冲突问题
Hi,
 
hive-exec本身并不包含Hadoop,如果是因为maven的传递依赖引入的话可以在打包时去掉。运行时使用的Hadoop版本可以用你集群Hadoop版本,而不是hive本身依赖的Hadoop版本。另外对于Flink
1.11也可以考虑使用官方提供的flink-sql-connector-hive Uber
jar,这个jar包含所有hive的依赖(Hadoop的依赖还是需要另外添加)。更详细的信息建议参考文档 [1][2]。
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes
 
On Mon, Aug 24, 2020 at 9:05 PM amen...@163.com  wrote:
 
>
> 补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里,
> 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下:
>
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
> at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
> at
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
> ... 19 more
> Caused by: java.lang.ClassCastException:
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto
> cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618)
> at
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
>         at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
> ... 24 more
>
> best,
> amenhub
>
> 发件人: amen...@163.com
> 发送时间: 2020-08-24 20:40
> 收件人: user-zh
> 主题: hive-exec依赖导致hadoop冲突问题
> hi, everyone
>
> 组件版本:flink-1.11.1,hive-2.1.1
>
> 问题描述:
> 使用Table
> API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;
>
> 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive
> table(不会发生hadoop依赖冲突);
>
> 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;
>
>
> 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?
>
> best,
> amenhub
>
 
 
-- 
Best regards!
Rui Li


回复: hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread amen...@163.com
补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里,
现在看来,可能是另外某个原因导致的,贴一点点异常栈如下:

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
... 19 more
Caused by: java.lang.ClassCastException: 
org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618)
at 
org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
at 
org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
... 24 more

best,
amenhub
 
发件人: amen...@163.com
发送时间: 2020-08-24 20:40
收件人: user-zh
主题: hive-exec依赖导致hadoop冲突问题
hi, everyone
 
组件版本:flink-1.11.1,hive-2.1.1
 
问题描述:
使用Table 
API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;
 
当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive 
table(不会发生hadoop依赖冲突);
但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;
 
请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?
 
best,
amenhub


hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread amen...@163.com
hi, everyone

组件版本:flink-1.11.1,hive-2.1.1

问题描述:
使用Table 
API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;

当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive 
table(不会发生hadoop依赖冲突);
但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;

请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?

best,
amenhub


回复: Re: flink 1.11 cdc相关问题

2020-07-24 Thread amen...@163.com
多谢!已关注~


Best


amen...@163.com
 
发件人: Leonard Xu
发送时间: 2020-07-24 16:20
收件人: user-zh
主题: Re: flink 1.11 cdc相关问题
Hi amenhub
 
针对这个问题,我建了个issue来跟踪这个问题[1],
另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息,
DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2]
 
Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18700 
<https://issues.apache.org/jira/browse/FLINK-18700>
[2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html 
<https://debezium.io/documentation/reference/1.2/connectors/postgresql.html>
 
> 在 2020年7月23日,09:14,amen...@163.com 写道:
>
> 感谢二位大佬@Leonard, @Jark的解答!
>
>
>
> amen...@163.com
>
> 发件人: Jark Wu
> 发送时间: 2020-07-22 23:56
> 收件人: user-zh
> 主题: Re: flink 1.11 cdc相关问题
> Hi,
>
> 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
> after 字段就不是全的。
> 这个问题会在后面地版本中解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 21:07, Leonard Xu  wrote:
>
>> Hello,
>>
>> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
>> 看起来是你的数据问题,一条 update 的changelog, before 为null,
>> 这是不合理的,没有before的数据,是无法处理after的数据的。
>> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>>
>> 祝好
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>
>>
>> {
>>"payload": {
>>"before": null,
>>"after": {
>>"id": 2,
>>"name": "liushimin",
>>"age": "24",
>>"sex": "man",
>>"phone": "1"
>>},
>>"source": {
>>"version": "1.2.0.Final",
>>"connector": "postgresql",
>>            "name": "postgres",
>>"ts_ms": 1595409754151,
>>"snapshot": "false",
>>"db": "postgres",
>>"schema": "public",
>>"table": "person",
>>"txId": 569,
>>"lsn": 23632344,
>>"xmin": null
>>},
>>"op": "u",
>>"ts_ms": 1595409754270,
>>"transaction": null
>>}
>> }
>>
>>> 在 2020年7月22日,17:34,amen...@163.com 写道:
>>>
>>>
>> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>>
>>
 

Re: Re: flink 1.11 cdc相关问题

2020-07-22 Thread amen...@163.com
感谢二位大佬@Leonard, @Jark的解答!



amen...@163.com
 
发件人: Jark Wu
发送时间: 2020-07-22 23:56
收件人: user-zh
主题: Re: flink 1.11 cdc相关问题
Hi,
 
这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
after 字段就不是全的。
这个问题会在后面地版本中解决。
 
Best,
Jark
 
On Wed, 22 Jul 2020 at 21:07, Leonard Xu  wrote:
 
> Hello,
>
> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
> 看起来是你的数据问题,一条 update 的changelog, before 为null,
> 这是不合理的,没有before的数据,是无法处理after的数据的。
> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>
> 祝好
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> >
>
> {
> "payload": {
> "before": null,
> "after": {
> "id": 2,
> "name": "liushimin",
> "age": "24",
> "sex": "man",
> "phone": "1"
> },
> "source": {
> "version": "1.2.0.Final",
> "connector": "postgresql",
> "name": "postgres",
> "ts_ms": 1595409754151,
> "snapshot": "false",
>         "db": "postgres",
> "schema": "public",
> "table": "person",
> "txId": 569,
> "lsn": 23632344,
> "xmin": null
> },
> "op": "u",
> "ts_ms": 1595409754270,
> "transaction": null
> }
> }
>
> > 在 2020年7月22日,17:34,amen...@163.com 写道:
> >
> >
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>
>


flink 1.11 cdc相关问题

2020-07-22 Thread amen...@163.com
res","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}'.
at 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
 ~[flink-json-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
 ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
 ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
 ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
 ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NullPointerException
at 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:120)
 ~[flink-json-1.11.0.jar:1.11.0]
... 7 more
2020-07-22 17:22:34,415 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for Source: TableSourceScan(table=[[default_catalog, 
default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> 
Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, 
age, sex, phone]) (1/1) (b553cb66df6e47a27e7dae8466b684ab).
2020-07-22 17:22:34,418 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state FAILED to JobManager 
for task Source: TableSourceScan(table=[[default_catalog, default_database, 
pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: 
Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, 
sex, phone]) (1/1) b553cb66df6e47a27e7dae8466b684ab.
2020-07-22 17:22:34,461 INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile{cpuCores=1., taskHeapMemory=384.000mb 
(402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb 
(536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 
495bb5a0cd877808674b29890b6b8bc0, jobId: 3feda3a191fcb8e0da891b9fda1ee532).
2020-07-22 17:22:34,462 INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
3feda3a191fcb8e0da891b9fda1ee532 from job leader monitoring.
2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Close JobManager connection for job 3feda3a191fcb8e0da891b9fda1ee532.

分割线==

best!



amen...@163.com


flink-1.11使用executeSql()执行DDL语句问题

2020-07-14 Thread amen...@163.com
hi, everyone

环境信息:flink-1.11.0, blink-planner, 本地ide开发测试(IDEA)

问题描述:使用executeSql()方法执行DDL语句,控制台打印如下异常信息。(flink-connector-kafka_2.11依赖已添加)

我不确定是否还有某个必要的依赖没有添加,还是有其他的地方没有考虑完整,请大佬赐教。

--分割线-
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a source for reading table 
'default_catalog.default_database.kafka_out'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'properties.group.id'='flink-1.11'
'scan.startup.mode'='group-offsets'
'topic'='flink-kafka'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at example.Example.main(Example.java:77)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factories that implement 
'org.apache.flink.table.factories.DeserializationFormatFactory' in the 
classpath.
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 25 more

--分割线-

祝好!
amenhub


Re: Re: standalone模式下metaspace内存溢出

2020-06-04 Thread amen...@163.com
你好,

社区已经发现过一些与 metaspace
泄漏相关的问题,其中部分已经修复,还有一些则与第三方依赖有关。由于你的图片没有显示出来,我不确定你遇到的是否是相同的问题。

此外,在即将发布的 1.11 中,Flink 针对这一问题进行了优化,作业会采用单独的 ClassLoader 运行,以避免作业结束之后
metaspace 还有泄漏的问题。1.11.0 已经进入发布前的测试阶段,RC1 已经发布,欢迎试用。

Thank you~

Xintong Song



On Fri, Jun 5, 2020 at 9:29 AM 胡泽康  wrote:

> flink版本:1.10
> standalone模式下,同一个batch作业提交多次后,taskmananger会metaspace内存溢出。
>
> 我用visualVm观察到,多次提交后,taskmanager的metaspace如下图:
>
> 虽然可以通过启动时调大jvm参数,但是这个应该是有问题的
>


Re: Re: flink savepoint checkpoint

2020-01-10 Thread amen...@163.com
hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn
 kill方式直接停止任务,会属于cancel还是stop亦或是其他?



amen...@163.com
 
From: Congxian Qiu
Date: 2020-01-10 17:16
To: user-zh
Subject: Re: flink savepoint checkpoint
Hi
从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job
之间的状态复用。
另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做
StopWithCheckpoint[2]
 
[1] https://issues.apache.org/jira/browse/FLINK-11458
[2] https://issues.apache.org/jira/browse/FLINK-12619
Best,
Congxian
 
 
zhisheng  于2020年1月10日周五 上午11:39写道:
 
> hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
> 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。
>
> 祝好!
> zhisheng
>
> Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:
>
> > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> > -->
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >
> >
> > lucas.wu  于2019年12月11日周三 上午11:56写道:
> >
> > > hi 各位:
> > >
> > >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
> >
>