回复:请教一下目前flink submit能不能指定额外的依赖jar

2020-11-06 文章 18868816710
不能在 Submit 模式指定的原因是 Flink 提交模式和 Spark 不同。Flink 是在客户端需要编译 
JobGgraph,这样导致在客户端运行时候就需要你所加载的 Jar,
而 Submit 模式的话,只是说把 Jar 包加载到运行的 Classpath 下;但是 Spark 程序的编译是在 服务端做的。
PS:目前 Flink-1.11 的话支持 Application 模式,有点类似 Spark 的提交模式,在这个模式下可以尝试看下是否 可以在 Submit 
模式下指定。
|
|


|
|


|
在2020年11月06日 11:12,silence 写道:
感谢回复,还是希望可以从submit上解决这个问题,不能添加依赖限制了很多应用场景,特别是针对平台来说 -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re:Re: Re:Re: Flink StreamingFileSink滚动策略

2020-11-06 文章 hailongwang
Hi bradyMk,


Bulk-encoded Formats  只能在 Checkpoint 时滚动,详见文档一[1].


Best,
Hailong Wang


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#bulk-encoded-formats

















在 2020-11-06 10:47:33,"bradyMk"  写道:
>Hi,guoliang_wang1335
>请问StreamingFileSink用forBulkFormat方法时,可以自定义滚动策略么?你这边实现成功了么?
>
>
>
>-
>Best Wishes
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:请教大神们关于flink-sql中数据赋值问题

2020-11-06 文章 hailongwang
Hi si_tianqiang,


自定义 UDF 可以解决你的问题吗?
比如 接收 kakfa 的数据字段定义成 hbaseQuery,然后自定义 UDF 去根据 query 查询数据。


Best,
Hailong Wang




在 2020-11-06 10:41:53,"site"  写道:
>看了官网的示例,发现sql中传入的值都是固定的,我有一个场景是从kafka消息队列接收查询条件,然后通过flink-sql映射hbase表进行查询并写入结果表。我使用了将消息队列映射表再join数据表的方式,回想一下这种方式很不妥,有什么好的方法实现sql入参的动态查询呢?


Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-11-06 文章 Yang Wang
失败的根本原因应该不是ConfigMap找不到,warning的那个信息是因为创建JobManager deployment的时候
ConfigMap还没创建出来,不会导致失败的。

你可以参考这个地方[1]把JobManager的log的打到console里面,然后用kubectl logs 来查看,这样
可以排查JobManager一直crash backoff的原因

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#log-files


Best,
Yang

Fy  于2020年11月4日周三 下午5:36写道:

> 您好,我也遇到了同样的问题。
> MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> "flink-config-flink-mm" not found
> Back-off restarting failed container
> 查看对应namespace 下的configmap,flink-config-flink-mm已经存在。但是JobManager pod
> 还是一直在重试,不能提供服务。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: 关于cluster.evenly-spread-out-slots参数的底层原理

2020-11-06 文章 Evan



 
发件人: Shawn Huang
发送时间: 2020-11-06 16:56
收件人: user-zh
主题: Re: 关于cluster.evenly-spread-out-slots参数的底层原理
我说一下我看源码(1.11.2)之后的理解吧,不一定准确,仅供参考。
 
cluster.evenly-spread-out-slots 这个参数设置后会作用在两个地方:
1. JobMaster 的 Scheduler 组件
2. ResourceManager 的 SlotManager 组件
 
对于 JobMaster 中的 Scheduler,
它在给 execution vertex 分配 slot 是按拓扑排序的顺序依次进行的。
Scheduler 策略是会倾向于把 execution vertex 分配到它的上游节点所分配到的slot上,
因此在给某个具体 execution vertex 分配 slot 时都会计算出一个当前节点倾向于选择的TaskManager集合,
然后在可选的 slot 候选集中会根据三个维度来为某个slot打分,分别是:
1. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 ResourceID
是相同的(对于standalone模式可以不考虑该维度)
2. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 全限定域名 是相同的
3. 候选slot所在的 TaskManager 目前的资源占用率
只有配置了 cluster.evenly-spread-out-slots 后,才会考虑第三个维度,否则仅会用前面两个维度进行打分。
打分之后会选择得分最高的 slot 分配给当前的 exection vertex。
需要注意的是这里的资源利用率只是根据某个 slot 所在的 TaskManager 中剩下多少个能够分配该 execution vertex 的
slot 计算出的,
(因为 Flink 要求同一 job vertex 的并行任务不能分配到同一 slot 中),能分配的越多,资源利用率越小,否则利用率越大。
而不是指实际的CPU内存等资源利用率。
 
对于 ResourceManager 中的 SlotManager 组件(这里说的都是 Standalone 模式下的
ResourceManager),
由于 JobMaster 的 slot 都是要向 resource manager 申请的。如果 JobMaster 需要新的 slot 了,会向
ResourceManager 的 SlotManager 组件申请。
如果没有配置 cluster.evenly-spread-out-slots 的话,SlotManager 从可用 slot 中随机返回一个。
如果配置了 cluster.evenly-spread-out-slots,SlotManager 会返回资源利用率最小的一个 slot。
这里的资源利用率计算方式是:看某个 slot 所在的 TaskManager 中有多少 slot 还没有被分配,空闲的越多,利用率越小,否则越大。
 
最后,你提问中说的均衡我没有太理解。某个算子的并发子任务是不会被分配到同一个slot中的,
但如果想把这些子任务均匀分配到不同机器上,这个当前的调度算法应该是无法保证的。
 
Best,
Shawn Huang
 
 
赵一旦  于2020年11月5日周四 下午10:18写道:
 
> 有没有人对cluster.evenly-spread-out-slots参数了解比较深入的给讲解下。
>
> 我主要想知道,设置这个参数为true之后。Flink是以一个什么样的规则去尽可能均衡分配的。
> standalone集群模式下,每个机器性能相同,flink slot数量配置相同情况下。基于*这种分配规则*,有没有一种方法让Flink做到
> *完全均衡*,而*不是尽可能均衡*?
>
> 此外,我说的“均衡”都特指算子级别的均衡。不要5机器一共5个slot,然后任务有5个算子,每个算子单并发并且通过不同的share
> group各独占1个slot这种均衡。我指的是每个算子都均衡到机器(*假设并发设置合理*)。
>


flink cdc时间问题

2020-11-06 文章 赵帅
关于cdc有个问题,求大佬能否解释下,是解析bin log的bug还是自己代码bug;



mysql数据库中表,创建时间和修改时间设置为current_timestamp


场景一:插入数据
插入数据时忽略创建时间和修改时间字段


cdc接入后,转存到hbase中,转为字符串时间,时间少8个小时


确认了,程序运行的服务器时间、mysql服务器的时间,和hbase服务器的时间,均为UTC+0800时区


场景二:重启服务,重新读取数据
此时cdc接入数据,会将最后的数据拿出来写入hbase,此时按照同样的执行,数据库时间也是放的正确时间,hbase时间也能吻合


场景二佐证了场景一中的所有环境时间时区统一问题




求问下,场景一中出现此情况的原因

Re: 关于cluster.evenly-spread-out-slots参数的底层原理

2020-11-06 文章 Shawn Huang
我说一下我看源码(1.11.2)之后的理解吧,不一定准确,仅供参考。

cluster.evenly-spread-out-slots 这个参数设置后会作用在两个地方:
1. JobMaster 的 Scheduler 组件
2. ResourceManager 的 SlotManager 组件

对于 JobMaster 中的 Scheduler,
它在给 execution vertex 分配 slot 是按拓扑排序的顺序依次进行的。
Scheduler 策略是会倾向于把 execution vertex 分配到它的上游节点所分配到的slot上,
因此在给某个具体 execution vertex 分配 slot 时都会计算出一个当前节点倾向于选择的TaskManager集合,
然后在可选的 slot 候选集中会根据三个维度来为某个slot打分,分别是:
1. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 ResourceID
是相同的(对于standalone模式可以不考虑该维度)
2. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 全限定域名 是相同的
3. 候选slot所在的 TaskManager 目前的资源占用率
只有配置了 cluster.evenly-spread-out-slots 后,才会考虑第三个维度,否则仅会用前面两个维度进行打分。
打分之后会选择得分最高的 slot 分配给当前的 exection vertex。
需要注意的是这里的资源利用率只是根据某个 slot 所在的 TaskManager 中剩下多少个能够分配该 execution vertex 的
slot 计算出的,
(因为 Flink 要求同一 job vertex 的并行任务不能分配到同一 slot 中),能分配的越多,资源利用率越小,否则利用率越大。
而不是指实际的CPU内存等资源利用率。

对于 ResourceManager 中的 SlotManager 组件(这里说的都是 Standalone 模式下的
ResourceManager),
由于 JobMaster 的 slot 都是要向 resource manager 申请的。如果 JobMaster 需要新的 slot 了,会向
ResourceManager 的 SlotManager 组件申请。
如果没有配置 cluster.evenly-spread-out-slots 的话,SlotManager 从可用 slot 中随机返回一个。
如果配置了 cluster.evenly-spread-out-slots,SlotManager 会返回资源利用率最小的一个 slot。
这里的资源利用率计算方式是:看某个 slot 所在的 TaskManager 中有多少 slot 还没有被分配,空闲的越多,利用率越小,否则越大。

最后,你提问中说的均衡我没有太理解。某个算子的并发子任务是不会被分配到同一个slot中的,
但如果想把这些子任务均匀分配到不同机器上,这个当前的调度算法应该是无法保证的。

Best,
Shawn Huang


赵一旦  于2020年11月5日周四 下午10:18写道:

> 有没有人对cluster.evenly-spread-out-slots参数了解比较深入的给讲解下。
>
> 我主要想知道,设置这个参数为true之后。Flink是以一个什么样的规则去尽可能均衡分配的。
> standalone集群模式下,每个机器性能相同,flink slot数量配置相同情况下。基于*这种分配规则*,有没有一种方法让Flink做到
> *完全均衡*,而*不是尽可能均衡*?
>
> 此外,我说的“均衡”都特指算子级别的均衡。不要5机器一共5个slot,然后任务有5个算子,每个算子单并发并且通过不同的share
> group各独占1个slot这种均衡。我指的是每个算子都均衡到机器(*假设并发设置合理*)。
>


KeyBy如何映射到物理分区

2020-11-06 文章 zxyoung
Hi,请教下各位:
   我的场景是现在有个Keyby操作,但是我需要指定某一个key落地在某一个具体物理分区中。
   我注意到keyby中得KeySelector仅仅是逻辑的分区,其实还是通过hash的方式来物理分区,没有办法指定哪一个key到哪一个分区去做。
   
我尝试使用partitionCustom中带有partitioner和keySelector的参数函数,但是发现没有办法直接使用类似Sum一类的聚合函数,实际测试发现Sum会将同一物理分区、但是不同Key的值都累加起来。
   例如Tuple2,id=1/2/3的给分区0,id=4的给分区1,直接使用sum的话,会将id=1/2/3的time都累加起来。
   有什么方法能让keyby方法也能够物理分区吗?还是只能在partitionCustom后给map算子加逻辑使得累加操作正确。