Re:Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 Lynn Chen
hi ,all 我的问题解决了. 出现该问题的原因如下: 因为通过堡垒机端口转发, 所以需要在bootstrap.servers 写上所有 kafka borker即可 1. 修改 kafka 外网配置 >> broker1 配置: >> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 >> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 >> listener.security.protocol.map=PLAINTEXT:PLAINT

答复: Flink 1.11里如何parse出未解析的执行计划

2020-10-22 文章 刘首维
Hi, 我的做法如我所说,是用反射将parser拿出来的,比较hack但是很简单而且很稳妥 代码差不多就是下面这个样子 Flink version: custom version base on 1.11.x @PostConstruct private void setup() throws NoSuchFieldException, IllegalAccessException { final StreamTableEnvironmentImpl env = (StreamTableEnvironmentImpl) support.getStreamT

Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题

2020-10-22 文章 air23
你好, 这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题 在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据, 但是jdbc写入mysql时候 发现mysql有时候是正常的,但是有时候会没有,当把并行度改成1的时候是正常的。 这边怀疑是乱序了,先insert 再delete了。所以导致结果表 没有这条数据,请问flink sql 或者flink cdc 怎么保证有序。

请问flink1.11版本如何设置checkpoint的默认保存个数

2020-10-22 文章 chenxuying
我看官方文档[1]应该是设置state.checkpoints.num-retained , 默认是1, 但是设置了没有效果, 官方说默认是1, 但是我发现好像是10 , 同时我也设置了其他的属性,比如 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION 是可行,所以我的设置应该没有什么问题 [1]:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.h

Re: pyflink和flink版本的兼容性问题

2020-10-22 文章 zhisheng
估计可能会有问题,很多变动 whh_960101 于2020年10月23日周五 上午11:41写道: > Hi,各位大佬, > 想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink > 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 > 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗

Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 zhisheng
hi 既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说 > 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1) 建议看看是不是这个转发有问题,只转发了一个节点 Best zhisheng Lynn Chen 于2020年10月23日周五 上午11:01写道: > > > > hi, zhisheng: > > > 我解析 json 后: > (xxx, xxx, xxx, topic, partition, offset) > => > > > (false,1603420582310,"

pyflink和flink版本的兼容性问题

2020-10-22 文章 whh_960101
Hi,各位大佬, 想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗

Re:Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 Lynn Chen
hi, zhisheng: 我解析 json 后: (xxx, xxx, xxx, topic, partition, offset) => (false,1603420582310,"INSERT","test3.order",2,75) (false,1603421312803,"INSERT","test3.order",2,76) (false,1603421344819,"INSERT","test3.order",2,77) (false,1603421344819,"INSERT","test3.order",2,78) 我增加十几条数据, 拿到的都是 p

Re: pyflink和pyspark中引用包不兼容,li例如pyspark 中pandas版本1.1.4 而pyflink是>=0.23<0.25

2020-10-22 文章 Xingbo Huang
Hi, pyspark对pandas版本的限制是>=0.23.2,你安装的话就默认安装了最新的版本的pandas,这是有很大的潜在风险的。 在pyflink 1.11版本的时候pdandas的版本限制是pandas>=0.23.4,<=0.25.3,使用更稳定的pandas的版本可以规避很多风险。而且这个版本范围也在pyspark的范围内,是能一起用的。 Best, Xingbo xuzh 于2020年10月23日周五 上午9:39写道: > pyflink和pyspark中引用包不兼容,li例如pyspark 中pandas版本1.1.4 而pyflink是>=0.23<0.25.

pyflink??pyspark????????????????li????pyspark ??pandas????1.1.4 ??pyflink??>=0.23<0.25

2020-10-22 文章 xuzh
pyflink??pysparklipyspark ??pandas1.1.4 ??pyflink??>=0.23<0.25. ??pandas?? pyflink??pyspak

Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 zhisheng
hi 如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema 来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。 eg: env.addSource(new FlinkKafkaConsumer011<>( parameters.get("topic"),new JSONKeyValueDeserializationSchema(true), buildKafkaProps(parameters)

Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 amen...@163.com
是的,正如@chenxuying 和@zhisheng 所说, 我这边采用的方案是通过pipeline.classpaths参数将需要的udf jar添加到类路径中,但是当task被分配到tm去执行时仍需要找到所需udf jar才行,所以在1.11版本中我采用-yt参数将/plugins插件目录上传至hdfs,即可解决这个问题~ best, amenhub 发件人: zhisheng 发送时间: 2020-10-22 23:28 收件人: user-zh 主题: Re: flink1.11加载外部jar包进行UDF注册 hi flink 1.11 如果是要管理 udf

请问Oracle作为维表注册到flinksql环境怎么支持?

2020-10-22 文章 Bruce
你好,我看目前jdbc connector仅支持mysql,postgresql可以注册到flinksql,我想把Oracle维表注册进去,怎样扩展connector可以实现呢? 发自我的iPhone

Re: Re: Flink-1.11.1 Rest API使用

2020-10-22 文章 amen...@163.com
还真是不支持,多谢解惑~ 发件人: Peidian Li 发送时间: 2020-10-22 19:13 收件人: user-zh 主题: Re: Flink-1.11.1 Rest API使用 Yarn 的proxy server不支持POST请求,这是前几天yarn同事给我截的图: 我们改了下proxy server的逻辑来支持POST请求就可以了 一个stop with savepoint例子: http://zjy-hadoop-prc-ct11.bj:21001/proxy/application_1600936402499_375893/jobs/790e4740ba

Re:FlinkSQL 窗口使用问题

2020-10-22 文章 hailongwang
Hi Roc, 这边涉及到 order by 和 limit 一起使用时如何执行的问题。 1. 只对找到 limit n 的数据后,然后进行 order by,并不是对所有的数据; 2. 对所有的数据进行 order by 后,再 limit; 目前看 flink 对 `StreamExecSortLimit` 只保证输出 limit n,但是不保证输出的 limit n 是排序的。 如果业务允许的话,可以在 limit 后面加个 offset,这样可以使用 `emitRecordsWithRowNumber`,保证 limit n 最后是 order by 的。 个人觉得 应该将 `St

Re:Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 Lynn Chen
hi, Qijun Feng: 我也遇到了类似的问题, 请问您后来是怎么解决的哈? 在 2020-04-03 09:27:52,"LakeShen" 写道: >Hi Qijun, > >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。 > >Best, >LakeShen > >Qijun Feng 于2020年4月2日周四 下午5:44写道: > >> Dear All, >> >> 我的 Kafka cluster 有三个机器,topic 也分了

Re:使用Flink Table API & SQL编写流应用,SQL中的NOW()等时间函数如何理解

2020-10-22 文章 hailongwang
Hi Longdexin, 根据文档[1]描述,now 函数是非确定性的,意思是不会在 RelNode 优化阶段将其 常量折叠优化掉,所以这个函数是会不断更新的,并不是启动的时间,并且一直不变。 在自定义 UDF 时候,可以覆盖方法 `default boolean isDeterministic ` 来决定是确定性的还是非确定性的,默认是true。 [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#temporal-fu

Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 zhisheng
hi flink 1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf jar 的路径地址,ps,这个参数只在 1.11 才支持 [1] https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs Best zhisheng Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道: > > http://apa

Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 文章 zhisheng
Hi Robin: 1、是不是更改了刷新时间?一直不显示吗? 2、running 的作业不会显示的,你可以之间在 yarn 查看,history server 应该是只提供展示挂掉的作业 PS:另外提几个 history server 的问题 1、挂掉的作业展示能否支持分页呢?目前直接在一个页面全部展示了历史所有的作业,打开会很卡 2、有办法可以查看挂掉作业的 jm 和 tm 日志吗?因为 HDFS 其实是有日志,按道理是可以拿到日志信息然后解析展示出来的,Spark history server 也是可以查看挂掉作业的日志 Best! zhisheng Robin Zhan

使用Flink Table API & SQL编写流应用,SQL中的NOW()等时间函数如何理解

2020-10-22 文章 Longdexin
请问,当流应用运行起来后,随着时间的推移,比如,到第二天了,SQL中的NOW()会随着处理时间不断更新,从而保证处理逻辑的正确性吗?在我的理解中,在流应用启动的那一刻,NOW()的值就确定了,以后也不会再改变了,那么,使用什么方式可以让SQL中的时间比较逻辑与时俱进呢?非常感谢。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 Xingbo Huang
Hi, 从源码编译安装把。可以参考文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink Best, Xingbo whh_960101 于2020年10月22日周四 下午6:47写道: > 现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗? > > > > > > > > > > > > > > > > 在 2020-10-22 16:34:56,"Yangze Guo" 写道: > >1.11版本中尚不支持user

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗? 在 2020-10-22 16:34:56,"Yangze Guo" 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 wrote: >> >> Hi,各位大佬们

Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 文章 Robin Zhang
如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。 使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per job方式提交到yarn运行的,只是多了个sql解析动作。不能理解 ,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。

Re: 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗

2020-10-22 文章 Danny Chan
你可以了解下 Calcite 的 metadata 系统,其中有一个 metadata: RelMdColumnOrigins 可以拿到 column 的血缘,前提是你要拿到 SQL 对的关系表达式树。 Best, Danny Chan 在 2020年10月20日 +0800 PM8:43,dawangli ,写道: > 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗

Re: flink sql 写入hive问题

2020-10-22 文章 Jingsong Li
writer的并行度是根据上游并行度来的 committer的并行度才是1 On Thu, Oct 22, 2020 at 5:22 PM 酷酷的浑蛋 wrote: > 我用flink sql实时写入hive表时发现sink的并行度为1? > 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢? 并行度1的写入速度很慢 > > > > -- Best, Jingsong Lee

flink sql 写入hive问题

2020-10-22 文章 酷酷的浑蛋
我用flink sql实时写入hive表时发现sink的并行度为1? 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢? 并行度1的写入速度很慢

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗? 在 2020-10-22 16:34:56,"Yangze Guo" 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 wrote: >> >> Hi,各位大佬们

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗? 在 2020-10-22 16:34:56,"Yangze Guo" 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 wrote: >> >> Hi,各位大

Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] [1] https://issues.apache.org/jira/browse/FLINK-18361 Best, Yangze Guo On Thu, Oct 22, 2020 at 3:47 PM whh_960101 wrote: > > Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch > connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入

pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable ( user_id STRING, user_name STRING uv BIG