Re: flink run命令是否支持读取远程文件系统中的jar文件?

2021-04-22 文章 JasonLee
hi 

session ,per-job 模式是不支持的 application 模式是支持的 



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink job消费kafka 失败,无法拿到offset值

2021-04-22 文章 Qingsheng Ren
你好 Jacob,

从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题:

1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。
2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 
的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。
3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 
TRACE,在日志中获取到更多的信息以帮助排查。

希望有所帮助!

[1] 
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/

—
Best Regards,

Qingsheng Ren
在 2021年4月14日 +0800 PM12:13,Jacob <17691150...@163.com>,写道:
> 有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
> Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。
>
> Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:
>
> java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
> Timeout of 6ms expired before the position for partition Test-topic-27
> could be determined
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
> 6ms expired before the position for partition Test-topic-27 could be
> determined
>
> 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。
>
> 请指教
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于upsert-kafka connector的问题

2021-04-22 文章 Shengkai Fang
如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。

Best,
Shengkai


多个复杂算子保证精准一次性

2021-04-22 文章 Colar
您好,

我有如下代码:

datastream.process(new Process1()).process(new Process2())…

这些Process可能有些复杂的计算操作

请问,如果我要保证端到端的精准一次性,我应该在所有的算子上都维护一个状态还是只在最后一个算子维护状态?

Re: Re:回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 文章 Xi Shen
我这边有使用jdbc table属性加了本地缓存

尝试把cache size设置为400/2/4,然后重启,消费kafka速度都是需要慢慢上涨



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink native k8s ????????

2021-04-22 文章 ??
flink 1.12.2 Native K8s,


 
./bin/kubernetes-session.sh \
 -Dkubernetes.namespace=flink-session-cluster \
 -Dkubernetes.jobmanager.service-account=flink \
 -Dkubernetes.cluster-id=session001 \
 -Dtaskmanager.memory.process.size=1024m \
 -Dkubernetes.taskmanager.cpu=1 \
 -Dtaskmanager.numberOfTaskSlots=4 \
 -Dresourcemanager.taskmanager-timeout=360





??svc??cluster-Ip
./bin/flink run -d \
 -e kubernetes-session \
 -Dkubernetes.namespace=flink-session-cluster \
 -Dkubernetes.cluster-id=session001 \
 examples/streaming/WindowJoin.jar





??k8s??

Re:?????? flink sql????kafka join??????????????????????

2021-04-22 文章 Michael Ran
??
?? 2021-04-22 11:21:55??""  ??
>Tidb??Tidb??TiDBstructured-streaming??
>??
>
>
>
>
>----
>??:
>"user-zh"  
>  
>:2021??4??22??(??) 10:50
>??:"user-zh"
>:Re: flink sqlkafka join??
>
>
>
>??SQLparse json??join
>SQL??join70s=3.8k3
>
>??JOIN??
>TiDB??
>useUnicode=truecharacterEncoding=UTF-8serverTimezone=Asia/ShanghairewriteBatchedStatements=true
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink mysql cdc????

2021-04-22 文章 Michael Ran
CDCbinlog
?? 2021-04-22 14:22:18??"" <1353637...@qq.com> ??
>??flink mysql cdc
>1.flink mysql 
>cdc??mysql??binlog??mysql
> 


?????? ????upsert-kafka connector??????

2021-04-22 文章 op
??upsert-kafka??key




----
??: 
   "user-zh"



Re:请问在使用processfunction 中的processelement()和onTimer()需要考虑并发问题吗?

2021-04-22 文章 李一飞
这两方法是同步的方式执行的,同时只能执行一个
在 2021-04-22 15:35:07,"x2009438"  写道:
>如题,谢谢各位。
>
>
>发自我的iPhone


Re: 疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大

2021-04-22 文章 HunterXHunter
没解决,我只能把它关闭了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink run命令是否支持读取远程文件系统中的jar文件?

2021-04-22 文章 casel.chen
flink 
run是否支持读取远程文件系统,例如oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File
 jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。

回复:Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-04-22 文章 田向阳
唉,这个问题着实让人头大,我现在还没找到原因。你这边确定了跟我说一声哈


| |
田向阳
|
|
邮箱:lucas_...@163.com
|

签名由 网易邮箱大师 定制

在2021年04月22日 20:56,张锴 写道:
你好,我也遇到了这个问题,你的checkpoint是怎么配置的,可以参考一下吗

Haihang Jing  于2021年3月23日周二 下午8:04写道:

> 你好,问题定位到了吗?
> 我也遇到了相同的问题,感觉和checkpoint interval有关
> 我有两个相同的作业(checkpoint interval
> 设置的是3分钟),一个运行在flink1.9,一个运行在flink1.12,1.9的作业稳定运行,1.12的运行5小时就会checkpoint
> 制作失败,抛异常 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable failure threshold.
> 当我把checkpoint interval调大到10分钟后,1.12的作业也可以稳定运行,所以我怀疑和制作间隔有关。
>
> 看到过一个issuse,了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
> feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量。但是不确定是不是一定和这个相关,以及如何定位影响。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-04-22 文章 张锴
你好,我也遇到了这个问题,你的checkpoint是怎么配置的,可以参考一下吗

Haihang Jing  于2021年3月23日周二 下午8:04写道:

> 你好,问题定位到了吗?
> 我也遇到了相同的问题,感觉和checkpoint interval有关
> 我有两个相同的作业(checkpoint interval
> 设置的是3分钟),一个运行在flink1.9,一个运行在flink1.12,1.9的作业稳定运行,1.12的运行5小时就会checkpoint
> 制作失败,抛异常 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable failure threshold.
> 当我把checkpoint interval调大到10分钟后,1.12的作业也可以稳定运行,所以我怀疑和制作间隔有关。
>
> 看到过一个issuse,了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
> feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量。但是不确定是不是一定和这个相关,以及如何定位影响。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于upsert-kafka connector的问题

2021-04-22 文章 Shengkai Fang
Hi,

请问是有什么具体的问题吗?

Best,
Shengkai

op <520075...@qq.com> 于2021年4月22日周四 下午6:05写道:

> 用 upsert-kafka connector 作为source,会有key的插入和更新出现乱序导致结果不准的问题吗?
> 谢谢


????upsert-kafka connector??????

2021-04-22 文章 op
?? upsert-kafka connector 
source??key??


flink1.12.2,interval join并没有 inProcessingTime() and inEventTime()

2021-04-22 文章 tianxy

 

FLIP-134: Batch execution for the DataStream API
Allow explicitly configuring time behaviour on KeyedStream.intervalJoin()
FLINK-19479

Before Flink 1.12 the KeyedStream.intervalJoin() operation was changing
behavior based on the globally set Stream TimeCharacteristic. In Flink 1.12
we introduced explicit inProcessingTime() and inEventTime() methods on
IntervalJoin and the join no longer changes behaviour based on the global
characteristic.


flink1.12.2,interval join并没有 inProcessingTime() and inEventTime()并没有找到对应的方法 
是不是还不支持 求解答!!!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.12.2使用rocksdb状态后端,checkpoint size变大

2021-04-22 文章 tianxy
452 
COMPLETED
103/103 2021-04-22 17:29:12 2021-04-22 17:29:12 325ms   4.40 MB 0 B 
(5.39 KB)
451 
COMPLETED
103/103 2021-04-22 17:28:12 2021-04-22 17:28:12 122ms   4.43 MB 9.36 KB 
(15.2
KB)
450 
COMPLETED
103/103 2021-04-22 17:27:12 2021-04-22 17:27:12 124ms   4.43 MB 0 B 
(5.39 KB)
449 
COMPLETED
103/103 2021-04-22 17:26:12 2021-04-22 17:26:12 112ms   4.48 MB 0 B 
(5.39 KB)
448 
COMPLETED
103/103 2021-04-22 17:25:12 2021-04-22 17:25:12 117ms   4.35 MB 0 B 
(5.39 KB)
447 
COMPLETED
103/103 2021-04-22 17:24:12 2021-04-22 17:24:12 113ms   4.41 MB 113 B 
(5.71
KB)
446 
COMPLETED
103/103 2021-04-22 17:23:12 2021-04-22 17:23:12 108ms   4.42 MB 9 B 
(5.86 KB)
445 
COMPLETED
103/103 2021-04-22 17:22:12 2021-04-22 17:22:12 130ms   4.47 MB 0 B 
(5.39 KB)
444 
COMPLETED
103/103 2021-04-22 17:21:12 2021-04-22 17:21:12 155ms   4.35 MB 0 B 
(5.39 KB)
443 
COMPLETED
103/103 2021-04-22 17:20:12 2021-04-22 17:20:12 119ms   4.45 MB 19.0 KB 
(46.1
KB)


flink1.12.2使用rocksdb作为后端存储时,发现ck size不断变大,我使用的是interval join
按理是不断失效的,应该在某个值附近摆动,同样的程序换成使用 fsstatebackend时发现就一直维持在几百k附近 不会一直变大。
请问这个是什么原因呢? 本来采取fsstatebackend,结果发现 运行一段时间(比如几个小时后)就会突然出现ck
失败,5分钟内无法完成ck超时,看日志并不是作业报错,只是单纯的 报以下错误:

Checkpoint 19 of job dd7b3ab0ec365d23c5dfa25dcf53a730 expired before complet
java.util.concurrent.CancellationException: null

请教大佬,求解答!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大

2021-04-22 文章 tianxy
你好  我也遇到了 所以这个问题你知道原因了没



--
Sent from: http://apache-flink.147419.n8.nabble.com/


请问在使用processfunction 中的processelement()和onTimer()需要考虑并发问题吗?

2021-04-22 文章 x2009438
如题,谢谢各位。


发自我的iPhone

flink mysql cdc????

2021-04-22 文章 ????
??flink mysql cdc
1.flink mysql 
cdc??mysql??binlog??mysql
 

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-22 文章 Rui Li
可以发一下具体的SQL语句么(包括DDL和insert)?

On Wed, Apr 21, 2021 at 5:46 PM HunterXHunter <1356469...@qq.com> wrote:

> 在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


??????flink sql cdc????kafka????????????????????

2021-04-22 文章 ????
flink-cdcSourceRecord??SourceRecord??topic??
??Debezium 
mysql-conectorkafka-connectortopic??
?? ??+??+topic?? 
SourceRecord??topic



----
??: 
   "user-zh"



Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 文章 Xi Shen
Cache设置大小为2w,超时时间为2h
实际上整个表大小为3w左右,考虑到整个表实际只有十几兆。我会尝试cache size设置为4w,保证整个表都能装进cache里。看会不会好一点


但是我查到现在怀疑跟savepoint有关:
- 如果我设置kafka offset=earliest,不带savepoint重启,flink
job启动消费时,lag有5000w左右,但是1分钟内就能达到约7k/s的消费速度。如下图,job在14:31启动,前面的速度特别大是因为offset重置,但是在14:33已经达到7.5k的消费速度
 
- 但是如果带savepoint启动,需要花35min才能达到这个消费速度。如下图
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 文章 Xi Shen
读JDBC table是有缓存的,看了源码,是用Guava cache实现

文档上说,整个Task Manager进程共享使用一个Cache,所以应该和广播的效果是一样的?所以应该不是查询TiDB导致的性能问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:flink sql cdc发到kafka消息表名信息缺失问题

2021-04-22 文章 casel.chen



我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink 
cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。













在 2021-04-22 11:01:22,"飞翔"  写道:

既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka,
比如canal的样例,虽然after 
不是很全,你可以自己去构造补全,这样你采用debezium不就好了,也就是flink-cdc为什么集成debezium的原因,更新前后都是一个完整的record





-- 原始邮件 --
发件人: "user-zh" ;
发送时间: 2021年4月22日(星期四) 上午9:41
收件人: "user-zh@flink.apache.org";
主题: flink sql cdc发到kafka消息表名信息缺失问题


最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal 
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink 
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?


CREATE TABLE `binlog_table` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter` STRING,
`tag` STRING,
`remark` STRING,
`create_date` TIMESTAMP,
`update_date` TIMESTAMP,
`reserve` STRING,
`sys_name` STRING,
`metric_seq` INT,
`advanced_function` STRING,
`value_type` STRING,
`value_field` STRING,
`status` INT,
`syn_date` TIMESTAMP,
`confirmer` STRING,
`confirm_time` TIMESTAMP,
`index_explain` STRING,
`field_name` STRING,
`tag_values` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '${mysql.hostname}',
  'port' = '3306',
  'username' = '${mysql.username}',
  'password' = '${mysql.password}',
  'database-name' = '${mysql.database}',
  'table-name' = '${mysql.table}'
  );


CREATE TABLE `kafka_sink` (
  `id` INT,
  `name` STRING,
  `sys_id` STRING,
  `sequence` INT,
  `filter` STRING,
  `tag` STRING,
  `remark` STRING,
  `create_date` TIMESTAMP,
  `update_date` TIMESTAMP,
  `reserve` STRING,
  `sys_name` STRING,
  `metric_seq` INT,
  `advanced_function` STRING,
  `value_type` STRING,
  `value_field` STRING,
  `status` INT,
  `syn_date` TIMESTAMP,
  `confirmer` STRING,
  `confirm_time` TIMESTAMP,
  `index_explain` STRING,
  `field_name` STRING,
  `tag_values` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = '${topic}',
  'properties.bootstrap.servers' = '${bootstrap.servers}',
  'format' = 'canal-json'
  );


INSERT INTO `kafka_sink`
(SELECT *
 FROM `binlog_table`);







Re:Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-22 文章 李一飞
明白了,谢谢~
在 2021-04-21 19:58:23,"Peihui He"  写道:
>fetch.min.bytes
>fetch.wait.max.ms
>还可以用着两个参数控制下的
>
>熊云昆  于2021年4月21日周三 下午7:10写道:
>
>> 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>>
>>
>> | |
>> 熊云昆
>> |
>> |
>> 邮箱:xiongyun...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>> 在2021年04月20日 18:19,李一飞 写道:
>> flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
>> 最好分流、批场景回答一下,谢谢!


Re:回复:flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-22 文章 李一飞
谢谢
在 2021-04-21 19:10:17,"熊云昆"  写道:
>有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>
>
>| |
>熊云昆
>|
>|
>邮箱:xiongyun...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2021年04月20日 18:19,李一飞 写道:
>flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
>最好分流、批场景回答一下,谢谢!