Re: 回复: flink sql 如何提高下游并发度?

2022-01-16 文章 venn
flink 1.12 及以上版本  sql kafka sink 支持参数: sink.parallelism  指定 sink 
的并行度


On 2022/1/11 17:06, 许友昌 wrote:

hi,


设置了parallelism=10 ,实际上是分配了 10 个 slot,flink 是会共享 slot 的,所以 sink 会有 10 线程。

在2022年1月11日 16:53,RS 写道:
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
Hi!

可以设置 parallelism.default 为需要的并发数。

Jeff  于2022年1月9日周日 19:44写道:

当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?




Re: flink算子级别资源使用设置

2022-01-16 文章 venn
目前只要部分 sink 算子支持参数: sink.parallelism   ,参考 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces 



On 2022/1/16 14:00, casel.chen wrote:

flink是否支持算子级别资源使用设置?
如果是flink sql 能否根据生成的Graph配置细粒度资源配置?




Re: Tumbling Windows 窗口可开的最小单位

2021-11-07 文章 venn
1ms ,但是在数量不是特别大的场景下,小长度的窗口没有意义,flink 
默认的网络缓冲区超时时间是 100ms


应该不怎么影响性能,就跟你在流中直接跟一个 
process方法差不多(定时器一直在刷)


On 2021/11/5 12:32, 李航飞 wrote:

滚动窗口最小可开多大,100ms?
对性能有什么影响吗?




flink 动态分区策略

2020-09-11 文章 venn
各位大佬,在执行flink 流任务的时候,经常会出现,某几台服务器的 CPU比较高(共
用集群,还有很多其他组件),导致在这些机器上的算子的延迟远远高于其他机器上的
算子,

请 flink 是否有动态分区策略或者 Taskmanager 迁移策略,可以完成类似于 spark
在算子执行很慢的情况下,master 会起一个一样的算子,如果后起的算子先完成任
务,任务也可见继续往下游执行。

感谢各位大佬



RE: 关于sink失败 不消费kafka消息的处理

2020-08-26 文章 venn
可以参考下这个: 
https://www.cnblogs.com/bethunebtj/p/9168274.html#5-%E4%B8%BA%E6%89%A7%E8%A1%8C%E4%BF%9D%E9%A9%BE%E6%8A%A4%E8%88%AAfault-tolerant%E4%B8%8E%E4%BF%9D%E8%AF%81exactly-once%E8%AF%AD%E4%B9%89

-Original Message-
From: user-zh-return-6980-wxchunjhyy=163@flink.apache.org 
 On Behalf Of 范超
Sent: Wednesday, August 26, 2020 2:42 PM
To: user-zh@flink.apache.org
Subject: 答复: 关于sink失败 不消费kafka消息的处理

您好 BenChao ,不知道是否有可以参考的两阶段提交的Flink 实例或者文档资料

-邮件原件-
发件人: Benchao Li [mailto:libenc...@apache.org] 
发送时间: 2020年8月26日 星期三 12:59
收件人: user-zh 
主题: Re: 关于sink失败 不消费kafka消息的处理

这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超  于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


-- 

Best,
Benchao Li


RE: flink1.11 kafka sql connector

2020-08-26 文章 venn
默认应该是 Kafka 的自动提交,开了Checkpoint 就 Checkpoint 提交

-Original Message-
From: user-zh-return-6960-wxchunjhyy=163@flink.apache.org 
 On Behalf Of Dream-底限
Sent: Wednesday, August 26, 2020 10:42 AM
To: user-zh@flink.apache.org
Subject: flink1.11 kafka sql connector

hi
我正在使用DDL语句创建kafka数据源,但是查看文档时候发现没有报漏参数指定消费者组的offset是否提交,请问这个默认情况下offset会不会提交到kafka分区


RE: Flink运行时可以转移数据吗?

2020-08-26 文章 venn
如果自己实现 KeySelector ,可以感知 下游节点的反压,动态调整 KeySelector 策
略 就可以

-Original Message-
From: user-zh-return-6979-wxchunjhyy=163@flink.apache.org
 On Behalf Of
Sun_yijia
Sent: Wednesday, August 26, 2020 2:17 PM
To: user-zh 
Subject: Flink运行时可以转移数据吗?

在做反压相关的代码,想请教各位大佬。


有一个分支节点,分支后面有两个节点A和B。假设A节点出现了反压,B节点负载空闲。
我想让B节点帮A节点做一些计算,这样B节点就能够缓解一部分A节点的压力。


有什么方法能让Flink在运行过程中,把接下来要发给A节点的数据发送给B节点吗?


RE: flink消费kafka提交偏移量

2020-07-29 文章 venn
可以参考下这个:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/
kafka.html#kafka-consumers-offset-committing-behaviour-configuration

-Original Message-
From: user-zh-return-6007-wxchunjhyy=163@flink.apache.org
 On Behalf Of 小学
生
Sent: 2020年7月30日 10:57
To: user-zh 
Subject: flink消费kafka提交偏移量

各位大佬,我自己使用flink1.11时,消费kafka数据源时候,使用group offset,在外
部的kafka tool发现offset没有及时提交,请问下这个在flink中怎么保证呢


RE: Flink使用Kafka作为source时checkpoint成功提交offset的机制

2020-07-29 文章 venn
可以这样理解,实际上souce 算子只知道这条数据发出去了,不知道这条数据执行到哪里的

-Original Message-
From: user-zh-return-5981-wxchunjhyy=163@flink.apache.org 
 On Behalf Of shuwen 
zhou
Sent: 2020年7月29日 15:10
To: user-zh@flink.apache.org
Subject: Re: Flink使用Kafka作为source时checkpoint成功提交offset的机制

比如读到一条offset值为100的消息,有Kafka Consumer source,一个Process算子和一个Sink算子
那么在这条消息到达Process算子,还未到达Sink算子时,提交offset是100吗?

On Wed, 29 Jul 2020 at 14:51, venn  wrote:

> checkpoint成功时就会把它的offset提交,可以看下这个类:  FlinkKafkaConsumerBase  的 这个方法:
> notifyCheckpointComplete
>
> -Original Message-
> From: user-zh-return-5976-wxchunjhyy=163@flink.apache.org
>  On Behalf Of 
> shuwen zhou
> Sent: 2020年7月29日 14:24
> To: user-zh@flink.apache.org
> Subject: Flink使用Kafka作为source时checkpoint成功提交offset的机制
>
> 大家好,请教一个问题,
>
> 当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条
> offset提交吗?
> 还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?
>
> 另外有大神指路这段代码具体在哪个文件吗?
> 谢谢!
>
> --
> Best Wishes,
> Shuwen Zhou
>


--
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>


RE: Flink使用Kafka作为source时checkpoint成功提交offset的机制

2020-07-28 文章 venn
checkpoint成功时就会把它的offset提交,可以看下这个类:  FlinkKafkaConsumerBase  的 这个方法: 
notifyCheckpointComplete

-Original Message-
From: user-zh-return-5976-wxchunjhyy=163@flink.apache.org 
 On Behalf Of shuwen 
zhou
Sent: 2020年7月29日 14:24
To: user-zh@flink.apache.org
Subject: Flink使用Kafka作为source时checkpoint成功提交offset的机制

大家好,请教一个问题,
当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条offset提交吗?
还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?

另外有大神指路这段代码具体在哪个文件吗?
谢谢!

-- 
Best Wishes,
Shuwen Zhou


回复: flink build-in 的 udf 的源码

2020-05-17 文章 venn
非常感谢大佬,耐心回复

-邮件原件-
发件人: user-zh-return-3567-wxchunjhyy=163@flink.apache.org 
 代表 Benchao Li
发送时间: 2020年5月16日 21:50
收件人: user-zh 
主题: Re: flink build-in 的 udf 的源码

Hi,

Flink内置函数的实现方式跟udf不太一样,很多函数是直接用的代码生成来做的。

下面是以blink planner为例,大概说下流程:
1. FlinkSqlOperatorTable 这个类里面放的是内置函数表,这个表会被calcite parse
SQL的时候用到,直接把这些函数识别为具体的某个函数定义。
2.
然后再代码生成阶段,会识别到这些函数,根据不同的函数定义,生成不同的函数实现调用。这部分你可以直接看下`org.apache.flink.table.planner.codegen.calls`这个package下的代码。
3. 上面第2条说的主要是scalar function的生成方式,agg
function还要特殊一点,这部分可以参考下`org.apache.flink.table.planner.functions.aggfunctions`这个package下的代码。


venn  于2020年5月16日周六 下午3:53写道:

> 各位大佬,请问下,flink 内置的 udf 的源码在什么位置,还有在哪里完成的函数注
> 册? 非常感谢各位大佬回复
>
>
>
> Thanks a lot !
>
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: 回复:flink1.10 ddl metric 不显示

2020-05-16 文章 venn
Subtask 显示的metrics 是整个 算子链的输入、输出的,算子全部 chain 
在一起后,对应的输入、输出就是外部系统了,所有看不到。可以去metrics 页面查看具体算子的metrics 指标

-邮件原件-
发件人: user-zh-return-3563-wxchunjhyy=163@flink.apache.org 
 代表 了不起的盖茨比
发送时间: 2020年5月16日 11:08
收件人: user-zh 
主题: 回复:flink1.10 ddl metric 不显示

为什么chain一起就无法看到了???求大佬解释一下。





-- 原始邮件 --
发件人: zhisheng 

flink build-in 的 udf 的源码

2020-05-16 文章 venn
各位大佬,请问下,flink 内置的 udf 的源码在什么位置,还有在哪里完成的函数注
册? 非常感谢各位大佬回复

 

Thanks a lot !

 



回复: 回复: 回复: flink on yarn 的 kerberos 认证问题

2019-12-03 文章 venn
大佬,非常感谢您的回复

“zookeeper也是simple” :我们集群使用的是国内一个厂商提供的hadoop 平台,权
限里面没有关于zookeeper 的配置,Java也可以无认证直接读取zookeeper 的数据。

“确定simple模式的flink可以拿到kerberos的kafka”: 在没有认证的hadoop集群上
的flink ,确实可以访问带 Kerberos 认证的kafka(疑问:看起来kafka 和 hdfs、
hbase 的认证方式不太一样?kafka 只需要带上 keytab 文件和用户名, hdfs /
hbase 需要使用 UserGroupInformation 主动认证)

“simple模式不能访问配置kerberos集群的hdfs”:想试下
“ipc.client.fallback-to-simple-auth-allowed” 这个参数的,不让加,也是无奈

今天试了下不 flink-conf.yaml 里面认证的配合删掉,发现任务的表现是一样的,也
是卡在 hbase.get 上面,日志基本上和 加了认证的一样(除了一些 加载参数的日
志)



-邮件原件-
发件人: user-zh-return-1563-wxchunjhyy=163@flink.apache.org
 代表 guoshuai
发送时间: Wednesday, November 27, 2019 6:29 PM
收件人: user-zh@flink.apache.org
主题: Re:回复: 回复: flink on yarn 的 kerberos 认证问题

我们的flink配置kerberos的,访问kerberos的组件kafka,HBase(zk也是kerberos)和开
源的simple组件es,mysql这些 读写都没什么问题.(Kerberos都是在一个集群下的,安全
认证的用户具有访问hdfs,zk,kafka的权限)

你说你的zookeeper也是simple的,两个simple组件互相访问是没问题,   但是确定
simple的zk可以让flink访问配置kerberos的hbase及存储数据的hdfs?(这个我没测过)
1:确定simple模式的flink可以拿到kerberos的kafka,是的话应该可以排除掉simple模
式flink跨集群访问kerberos的问题
2:有没有可能是simple模式不能访问配置kerberos集群的hdfs导致的,

在配置kerberos集群的core-site.xml开启allow simple 试试

ipc.client.fallback-to-simple-auth-allowedtrue
 








在 2019-11-27 17:08:48,"venn"  写道:
>我们好像zookeeper 没有开安全认证,从Java 代码也没有添加 jaas.conf 文件,而
且
>看 日志里面有 关于 zookeeper 已经建立连接相关的日志。
>
>
>问题其实还没到这一步,我现在的问题是:“Flink on yarn 运行在不用认证的
hadoop
>集群上,是否可以访问带kerberos 认证的hadoop集群的 hbase”
>
>现在是这两种现象:
>   1、直接在不认证的hadoop集群提交 读认证hbase 的任务,可以从日志看到,
>hadoop 运行在 simple 模式(默认模式,不认证模式),对应日志“Hadoop user
set to xxx (auth: 
>SIMPLE) ”,任务卡在读hbase 的地方,直到超时
>   2、修改提交节点的 core-site.xml/hdfs-site.xml,注入一个新的
>core-site.xml 带配置参数 " Hadoop.security.authentication  = kerberos",可
以
>在日志中看到 "Hadoop user set to xxx (auth: KERBEROS)",但是任务一直处于
>“created” 状态,日志报:“server asks us to fall back to SIMPLE auth. But
the 
>client is configured to only allow secure connections”
>
>
>
>
>-邮件原件-
>发件人: user-zh-return-1559-wxchunjhyy=163@flink.apache.org
> 代表 guoshuai
>发送时间: Wednesday, November 27, 2019 2:26 PM
>收件人: user-zh@flink.apache.org
>主题: Re:回复: flink on yarn 的 kerberos 认证问题
>
>
>
>HBase认证需要ZooKeeper和Kerberos安全认证,跟ZooKeeper认证“jaas.conf”文件也
>加载进去了吗?
>
>LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userName, 
>userKeytabFile); 
>LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY,
>ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
>LoginUtil.login(userName, userKeytabFile, krb5File, conf);
>
>
>
>
>
>
>
>
>在 2019-11-27 14:00:15,"venn"  写道:
>>我们kafka 可以正常认证、消费数据,认证hbase 和kafka 好像不一样,我们是不认
>证
>>读不到数据,认证了,任务又提交不到 yarn 上去了
>>
>>如下:
>>>看过对应位置的代码,将 “Hadoop.security.authentication =
>>kerberos” 
>>>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>>>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
>>“Hadoop.security.
>>>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处
于
>>>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
>>SIMPLE
>>>auth. But the client is configured to only allow secure connections”
>>
>>
>>
>>-邮件原件-
>>发件人: user-zh-return-1557-wxchunjhyy=163@flink.apache.org
>> 代表 guoshuai
>>发送时间: Wednesday, November 27, 2019 1:31 PM
>>收件人: user-zh@flink.apache.org
>>主题: Re:flink on yarn 的 kerberos 认证问题
>>
>>kerberos用户的krb5.conf ,user.keytab文件是否在程序运行时加载到了,我之前遇
到
>>的kerberos问题是flink读kafka获取不到数据,通过yarn-session模式运行,认证阶段
>是
>>在yarn-session发布完成的. 最后问题出在kafka通信协议,可以看下hbase端的配置,
>实
>>现不行 也可以解耦hbase跟flink中间加一个kafka
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2019-11-26 14:50:32,"venn"  写道:
>>>各位大佬:
>>>
>>>请教一个flink 认证的问题: Flink on yarn 运行在不用认证的
>>Hadoop
>>>集群上,怎样访问带kerberos 认证集群的 hbase ?
>>>
>>> 
>>>
>>>下面是一些我们使用的描述和发现的问题:
>>>
>>>我们有两个hadoop 集群,一个使用 Kerberos 认证模式,一个是
>>simple
>>>认证模式,Flink 1.9.0  部署在 simple 认证的集群上。
>>>
>>>最近在使用flink 读取 Kerberos 认证的集群的 hbase 上遇到了问题。配置
>>>flink-conf.yaml 中的配置参数:security.kerberos.login.keytab 、
>>>security.kerberos.login.principal 。
>>>
>>>我们计划在 map 中同步的读取 hbase 的数据,从输入数据中获取
>>>rowkey,使用get 方式获取hbase 数据,程序启动后,呈现 “卡” 在map 算子
上,
>>直
>>>到hbase get 超时,无法读取任何数据。在 taskmanager.log 中有发现有这样的日
>>>志: 
>>>
>>>
>>>org.apache.flink.yarn.YarnTaskExecutorRunner   - OS current user: yarn
>>>
>>>org.apache.flink.yarn.YarnTaskExecutorRunner   - current Hadoop/Kerberos
>>>user: admin (注:登陆用户)
>>>
>>> 
>>>
>>>org.apache.flink.yarn.YarnTaskExecuto

回复: 回复: flink on yarn 的 kerberos 认证问题

2019-11-27 文章 venn
我们好像zookeeper 没有开安全认证,从Java 代码也没有添加 jaas.conf 文件,而且
看 日志里面有 关于 zookeeper 已经建立连接相关的日志。


问题其实还没到这一步,我现在的问题是:“Flink on yarn 运行在不用认证的hadoop
集群上,是否可以访问带kerberos 认证的hadoop集群的 hbase”

现在是这两种现象:
1、直接在不认证的hadoop集群提交 读认证hbase 的任务,可以从日志看到,
hadoop 运行在 simple 模式(默认模式,不认证模式),对应日志“Hadoop user set
to xxx (auth: SIMPLE) ”,任务卡在读hbase 的地方,直到超时
2、修改提交节点的 core-site.xml/hdfs-site.xml,注入一个新的
core-site.xml 带配置参数 " Hadoop.security.authentication  = kerberos",可以
在日志中看到 "Hadoop user set to xxx (auth: KERBEROS)",但是任务一直处于
“created” 状态,日志报:“server asks us to fall back to SIMPLE auth. But
the client is configured to only allow secure connections”




-邮件原件-
发件人: user-zh-return-1559-wxchunjhyy=163@flink.apache.org
 代表 guoshuai
发送时间: Wednesday, November 27, 2019 2:26 PM
收件人: user-zh@flink.apache.org
主题: Re:回复: flink on yarn 的 kerberos 认证问题



HBase认证需要ZooKeeper和Kerberos安全认证,跟ZooKeeper认证“jaas.conf”文件也
加载进去了吗?

LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userName,
userKeytabFile);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY,
ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
LoginUtil.login(userName, userKeytabFile, krb5File, conf);








在 2019-11-27 14:00:15,"venn"  写道:
>我们kafka 可以正常认证、消费数据,认证hbase 和kafka 好像不一样,我们是不认
证
>读不到数据,认证了,任务又提交不到 yarn 上去了
>
>如下:
>>看过对应位置的代码,将 “Hadoop.security.authentication =
>kerberos” 
>>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
>“Hadoop.security.
>>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
>>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
>SIMPLE
>>auth. But the client is configured to only allow secure connections”
>
>
>
>-邮件原件-
>发件人: user-zh-return-1557-wxchunjhyy=163@flink.apache.org
> 代表 guoshuai
>发送时间: Wednesday, November 27, 2019 1:31 PM
>收件人: user-zh@flink.apache.org
>主题: Re:flink on yarn 的 kerberos 认证问题
>
>kerberos用户的krb5.conf ,user.keytab文件是否在程序运行时加载到了,我之前遇到
>的kerberos问题是flink读kafka获取不到数据,通过yarn-session模式运行,认证阶段
是
>在yarn-session发布完成的. 最后问题出在kafka通信协议,可以看下hbase端的配置,
实
>现不行 也可以解耦hbase跟flink中间加一个kafka
>
>
>
>
>
>
>
>
>在 2019-11-26 14:50:32,"venn"  写道:
>>各位大佬:
>>
>>请教一个flink 认证的问题: Flink on yarn 运行在不用认证的
>Hadoop
>>集群上,怎样访问带kerberos 认证集群的 hbase ?
>>
>> 
>>
>>下面是一些我们使用的描述和发现的问题:
>>
>>我们有两个hadoop 集群,一个使用 Kerberos 认证模式,一个是
>simple
>>认证模式,Flink 1.9.0  部署在 simple 认证的集群上。
>>
>>最近在使用flink 读取 Kerberos 认证的集群的 hbase 上遇到了问题。配置
>>flink-conf.yaml 中的配置参数:security.kerberos.login.keytab 、
>>security.kerberos.login.principal 。
>>
>>我们计划在 map 中同步的读取 hbase 的数据,从输入数据中获取
>>rowkey,使用get 方式获取hbase 数据,程序启动后,呈现 “卡” 在map 算子上,
>直
>>到hbase get 超时,无法读取任何数据。在 taskmanager.log 中有发现有这样的日
>>志: 
>>
>>
>>org.apache.flink.yarn.YarnTaskExecutorRunner   - OS current user: yarn
>>
>>org.apache.flink.yarn.YarnTaskExecutorRunner   - current Hadoop/Kerberos
>>user: admin (注:登陆用户)
>>
>> 
>>
>>org.apache.flink.yarn.YarnTaskExecutorRunner   - YARN daemon is running
as:
>>admin Yarn client user obtainer: admin
>>
>>org.apache.flink.runtime.security.modules.HadoopModule  - Hadoop user 
>>set to admin (auth:SIMPLE)
>>
>> 
>>
>>看过对应位置的代码,将 “Hadoop.security.authentication =
>kerberos” 
>>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
>“Hadoop.security.
>>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
>>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
>SIMPLE
>>auth. But the client is configured to only allow secure connections”
>>
>> 
>>
>> 
>>
>>看到官网文档有这样的描述:
>>https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/securi
>>t
>>y-ker
>>beros.html
>>
>>
>>Hadoop Security Module
>>
>>This module uses the Hadoop UserGroupInformation (UGI) class to 
>>establish a process-wide login user context. The login user is then 
>>used for all interactions with Hadoop, including HDFS, HBase, and YARN.
>>
>>If Hadoop security is enabled (in core-site.xml), the login user will 
>>have whatever Kerberos credential is configured. Otherwise, the login 
>>user conveys only the user identity of the OS account that launched 
>>the
>cluster.
>>
>> 
>>
>> 
>>
>> 
>>


回复: flink on yarn 的 kerberos 认证问题

2019-11-26 文章 venn
我们kafka 可以正常认证、消费数据,认证hbase 和kafka 好像不一样,我们是不认证
读不到数据,认证了,任务又提交不到 yarn 上去了

如下:
>看过对应位置的代码,将 “Hadoop.security.authentication =
kerberos” 
>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
“Hadoop.security.
>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
SIMPLE 
>auth. But the client is configured to only allow secure connections”



-邮件原件-
发件人: user-zh-return-1557-wxchunjhyy=163@flink.apache.org
 代表 guoshuai
发送时间: Wednesday, November 27, 2019 1:31 PM
收件人: user-zh@flink.apache.org
主题: Re:flink on yarn 的 kerberos 认证问题

kerberos用户的krb5.conf ,user.keytab文件是否在程序运行时加载到了,我之前遇到
的kerberos问题是flink读kafka获取不到数据,通过yarn-session模式运行,认证阶段是
在yarn-session发布完成的. 最后问题出在kafka通信协议,可以看下hbase端的配置,实
现不行 也可以解耦hbase跟flink中间加一个kafka








在 2019-11-26 14:50:32,"venn"  写道:
>各位大佬:
>
>请教一个flink 认证的问题: Flink on yarn 运行在不用认证的
Hadoop
>集群上,怎样访问带kerberos 认证集群的 hbase ?
>
> 
>
>下面是一些我们使用的描述和发现的问题:
>
>我们有两个hadoop 集群,一个使用 Kerberos 认证模式,一个是
simple
>认证模式,Flink 1.9.0  部署在 simple 认证的集群上。
>
>最近在使用flink 读取 Kerberos 认证的集群的 hbase 上遇到了问题。配置
>flink-conf.yaml 中的配置参数:security.kerberos.login.keytab 、
>security.kerberos.login.principal 。
>
>我们计划在 map 中同步的读取 hbase 的数据,从输入数据中获取
>rowkey,使用get 方式获取hbase 数据,程序启动后,呈现 “卡” 在map 算子上,
直
>到hbase get 超时,无法读取任何数据。在 taskmanager.log 中有发现有这样的日
>志: 
>
>
>org.apache.flink.yarn.YarnTaskExecutorRunner   - OS current user: yarn
>
>org.apache.flink.yarn.YarnTaskExecutorRunner   - current Hadoop/Kerberos
>user: admin (注:登陆用户)
>
> 
>
>org.apache.flink.yarn.YarnTaskExecutorRunner   - YARN daemon is running as:
>admin Yarn client user obtainer: admin
>
>org.apache.flink.runtime.security.modules.HadoopModule  - Hadoop user 
>set to admin (auth:SIMPLE)
>
> 
>
>看过对应位置的代码,将 “Hadoop.security.authentication =
kerberos” 
>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
“Hadoop.security.
>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
SIMPLE 
>auth. But the client is configured to only allow secure connections”
>
> 
>
> 
>
>看到官网文档有这样的描述:
>https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/securit
>y-ker
>beros.html
>
>
>Hadoop Security Module
>
>This module uses the Hadoop UserGroupInformation (UGI) class to 
>establish a process-wide login user context. The login user is then 
>used for all interactions with Hadoop, including HDFS, HBase, and YARN.
>
>If Hadoop security is enabled (in core-site.xml), the login user will 
>have whatever Kerberos credential is configured. Otherwise, the login 
>user conveys only the user identity of the OS account that launched the
cluster.
>
> 
>
> 
>
> 
>


flink on yarn 的 kerberos 认证问题

2019-11-25 文章 venn
各位大佬:

请教一个flink 认证的问题: Flink on yarn 运行在不用认证的 Hadoop
集群上,怎样访问带kerberos 认证集群的 hbase ?

 

下面是一些我们使用的描述和发现的问题:

我们有两个hadoop 集群,一个使用 Kerberos 认证模式,一个是 simple
认证模式,Flink 1.9.0  部署在 simple 认证的集群上。

最近在使用flink 读取 Kerberos 认证的集群的 hbase 上遇到了问题。配置
flink-conf.yaml 中的配置参数:security.kerberos.login.keytab 、
security.kerberos.login.principal 。 

我们计划在 map 中同步的读取 hbase 的数据,从输入数据中获取
rowkey,使用get 方式获取hbase 数据,程序启动后,呈现 “卡” 在map 算子上,直
到hbase get 超时,无法读取任何数据。在 taskmanager.log 中有发现有这样的日
志: 


org.apache.flink.yarn.YarnTaskExecutorRunner   - OS current user: yarn

org.apache.flink.yarn.YarnTaskExecutorRunner   - current Hadoop/Kerberos
user: admin (注:登陆用户)

 

org.apache.flink.yarn.YarnTaskExecutorRunner   - YARN daemon is running as:
admin Yarn client user obtainer: admin

org.apache.flink.runtime.security.modules.HadoopModule  - Hadoop user set to
admin (auth:SIMPLE)

 

看过对应位置的代码,将 “Hadoop.security.authentication =
kerberos” 参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数 “Hadoop.security.
authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
created 状态,taskmanager.log 中一直报 “server asks us to fall back to
SIMPLE auth. But the client is configured to only allow secure connections”

 

 

看到官网文档有这样的描述:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/security-ker
beros.html 


Hadoop Security Module

This module uses the Hadoop UserGroupInformation (UGI) class to establish a
process-wide login user context. The login user is then used for all
interactions with Hadoop, including HDFS, HBase, and YARN.

If Hadoop security is enabled (in core-site.xml), the login user will have
whatever Kerberos credential is configured. Otherwise, the login user
conveys only the user identity of the OS account that launched the cluster.

 

 

 



flink 读带认证的hbase 问题

2019-11-07 文章 venn
各位大佬:

请问:flink on yarn 模式(standalone 模式下也不行)下 读带
kerberos 认证的 hbase,返回认证成功了,但是还是不能查询,一直报 “Caused by:
GSSExecption: No valid credentials provided (Mechanism level: Failed to find
any Kerberos tgt)”

发现认证之后,当前用户(UserGroupInformation.getLoginUser )和登陆用户
(UserGroupInformation.getCurrentUser )不一样

认证之前,当前用户和登陆用户都是  admin (auth:SIMPLE)#admin 是登
陆系统的用户

认证之后,当前用户还是 admin (auth:SIMPLE) ,登陆用户变成了认证的那个用户
xxx (auth: KERBEROS)

程序在IDE  里面可以正常执行,当前用户和登陆用户都是用一个用户,
已确定keytab 文件 是没有问题的 

配置应该没有问题,因为其实已经认证成功了,但是看起来执行程序的用户和认证的用
户不是同一个用户,请问各位大佬有了解的吗? 

 

非常感谢各位大佬

 

报错如下:

FATAL org.apache.hadoop.ipc.RpcClient  - SASL authentication failed. The
most likely cause is missing or invalid credentials. Consider 'kinit'.

 

javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to find
any Kerberos tgt)]

 

Caused by: GSSException: No valid credentials provided (Mechanism level:
Failed to find any Kerberos tgt)

 



回复: Split a stream into any number of streams

2019-09-17 文章 venn
恐怕不行,sideoutput 和 split 都需要先知道要分多少个流

如sideoutput 需要先定义tag:
val late = new OutputTag[LateDataEvent]("late")


-邮件原件-
发件人: user-zh-return-1164-wxchunjhyy=163@flink.apache.org 
 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh 
主题: Re: Split a stream into any number of streams

是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi  于2019年9月17日周二 下午1:33写道:

> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng” 写入:
>
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to 
> a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter
> -or-split-to-split-a-stream
>
> regards.
>
>


回复: 编译flink 1.9 flink-table-api-java 编译不过

2019-09-11 文章 venn
非常感谢,jdk 升到 1.8.0_111 解决了



-邮件原件-
发件人: user-zh-return-1139-wxchunjhyy=163@flink.apache.org 
 代表 Zili Chen
发送时间: Wednesday, September 11, 2019 10:35 AM
收件人: user-zh 
主题: Re: 编译flink 1.9 flink-table-api-java 编译不过

看起来是一个 JDK 的 bug
https://stackoverflow.com/questions/25523375/java8-lambdas-and-exceptions

你可以升级 JDK 的小版本吗?我在 8.0.212 上没遇到这个问题。

Best,
tison.


venn  于2019年9月11日周三 上午10:26写道:

> 各位大佬,请教一下编译Flink 1.9 的问题,编译 flink-table-api-java 的时候 
> 只
> 要有  “.orElseThrow(() -> new ValidationException("Undefined function: "
> + lookupCall.getUnresolvedName()));”  就不能通过编译,jdk版本是 
>  1.8.0_91,请
> 问各位大佬应该怎么处理。
>
> 报错如下:
>
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile
> (default-compile) on project flink-table-api-java: Compilation failure
>
> [ERROR]
>
> /home/venn/git/flink/flink-table/flink-table-api-java/src/main/java/or
> g/apac
>
> he/flink/table/operations/utils/factories/CalculatedTableFactory.java:
> [90,53 ] unreported exception X; must be caught or declared to be
> thrown
>
> [ERROR]
>
> [ERROR] -> [Help 1]
>
> [ERROR]
>
> [ERROR] To see the full stack trace of the errors, re-run Maven with
> the -e switch.
>
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>
> [ERROR]
>
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
>
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>
> [ERROR]
>
> [ERROR] After correcting the problems, you can resume the build with
> the command
>
> [ERROR]   mvn  -rf :flink-table-api-java
>
>
>
>


编译flink 1.9 flink-table-api-java 编译不过

2019-09-10 文章 venn
各位大佬,请教一下编译Flink 1.9 的问题,编译 flink-table-api-java 的时候 只
要有  “.orElseThrow(() -> new ValidationException("Undefined function: " +
lookupCall.getUnresolvedName()));”  就不能通过编译,jdk版本是 1.8.0_91,请
问各位大佬应该怎么处理。

报错如下:


[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile
(default-compile) on project flink-table-api-java: Compilation failure

[ERROR]
/home/venn/git/flink/flink-table/flink-table-api-java/src/main/java/org/apac
he/flink/table/operations/utils/factories/CalculatedTableFactory.java:[90,53
] unreported exception X; must be caught or declared to be thrown

[ERROR] 

[ERROR] -> [Help 1]

[ERROR] 

[ERROR] To see the full stack trace of the errors, re-run Maven with the -e
switch.

[ERROR] Re-run Maven using the -X switch to enable full debug logging.

[ERROR] 

[ERROR] For more information about the errors and possible solutions, please
read the following articles:

[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

[ERROR] 

[ERROR] After correcting the problems, you can resume the build with the
command

[ERROR]   mvn  -rf :flink-table-api-java

 



Flink 周期性创建watermark,200ms的周期是怎么控制的

2019-09-02 文章 venn
各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?

 

周期新创建watermark  方法如下:

.assignAscendingTimestamps(element =>
sdf.parse(element.createTime).getTime)

.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))

 

 

生成Timestamp的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :

 


@Override
public void processElement(StreamRecord element) throws Exception {
   final long newTimestamp =
userFunction.extractTimestamp(element.getValue(),
 element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

   output.collect(element.replace(element.getValue(), newTimestamp));
}

 

 

生成watermark的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :


@Override
public void onProcessingTime(long timestamp) throws Exception {
   // 从这里可以看到,每200ms 打印一次
   System.out.println("timestamp : " + timestamp + ", system.current : " +
System.currentTimeMillis());
   // register next timer
   Watermark newWatermark = userFunction.getCurrentWatermark();
   if (newWatermark != null && newWatermark.getTimestamp() >
currentWatermark) {
  currentWatermark = newWatermark.getTimestamp();
  // emit watermark
  output.emitWatermark(newWatermark);
   }

   long now = getProcessingTimeService().getCurrentProcessingTime();
   getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

 

 

 

感谢各位大佬



回复: Flink命令提交任务时是否支持配置文件与任务jar包分离

2019-08-11 文章 venn
可以分离,客户端提交的时候,初始化是在客户端上完成的,JobGraph 提交到
JobManager 之后不需要配置文件了


-邮件原件-
发件人: user-zh-return-797-wxchunjhyy=163@flink.apache.org
 代表
jinxiaolong_al...@163.com
发送时间: Saturday, August 10, 2019 12:33 AM
收件人: user-zh 
主题: Flink命令提交任务时是否支持配置文件与任务jar包分离

各位社区大佬:
   请问使用Flink命令提交任务时是否支持配置文件与任务jar包分离。
比如我的任务自身有个配置文件job.yaml,目前该配置是打到jar包中随任务提交的,可
是有时候只是要调整下配置代码没改动也要重新打包发到环境上,感觉这样不灵活,
所以我想问下能不能单独把配置文件(可能是多个文件)放到一个目录下,然后提交任务
的时候指定配置文件或者是配置目录。
类似jobManager把这些配置分发到TaskManager的classPath下这样的逻辑,这样就不用
改下配置也要重新打包发到环境上了。
倒是有个-yt参数,但是这个是用来将指定的jar包传到容器中,不适用我说的场景吧。
各位大佬请问有没有好的办法或思路,求指导。

我用的flink版本是1.7.2



jinxiaolong_al...@163.com


答复: Scala 异步 io 实现

2019-07-08 文章 venn
Thanks for your attention,  as your words the " 
org.apache.flink.streaming.api.functions.async.RichAsyncFunction " extend " 
org.apache.flink.streaming.api.functions.asyn.AsyncFunction", but Scala 
AsyncDataStream.orderedWait parameter AsyncFunction full path is " 
org.apache.flink.streaming.api.scala.async.AsyncFunction", there are different

-邮件原件-
发件人: user-zh-return-554-wxchunjhyy=163@flink.apache.org 
 代表 Lin Li
发送时间: Monday, July 8, 2019 6:51 PM
收件人: user-zh@flink.apache.org
主题: Re: Scala 异步 io 实现

Your scala asyncFunction can extends
org.apache.flink.streaming.api.functions.async.RichAsyncFunction directly.

venn  于2019年7月8日周一 下午6:47写道:

> 大佬们好:
>
> 在开发Scala 的异步io 的时候遇到点问题,Scala 
> 中没有RichAsyncFunction(Rich类有open方法,可以做初始化操作
> ),Scala 版本的 AsyncFunction 与Java 版本的AsyncFunction不是同一个类,所以不能使用Java 的
> RichAsyncFunction,请问各位大佬,怎么实现Scala 的RichAsyncFunction ?
>
> 详情如下:
>
> *Scala*中 使用 AsyncDataStream.orderedWait[IN, OUT:
> TypeInformation](  input: DataStream[IN],  asyncFunction: 
> *AsyncFunction*[IN, OUT], timeout: Long,  timeUnit: TimeUnit, 
> capacity: Int) 方法 中的 参数 AsyncFunction
> 全路径是 : *org.apache.flink.streaming.api.scala.async*.AsyncFunction 
> ,但是该类没有对应的
> RichAsyncFunction 的实现。
>
>
>
> 对应的*Java* 版本  AsyncDataStream.orderedWait[IN, OUT: TypeInformation](
> input: DataStream[IN],  asyncFunction: *AsyncFunction*[IN, OUT], timeout:
> Long,  timeUnit: TimeUnit, capacity: Int) 中参数 AsyncFunction 的全路径是 :
> *org.apache.flink.streaming.api.functions.asyn*. AsyncFunction, 有对应的
> RichAsyncFunction的实现(见下图)。
>
>
>
>
>
> 类:org.apache.flink.streaming.api.functions.async. RichAsyncFunction
>
>
>
>
>
>
>
> 非常感谢各位大佬回复
>


Scala 异步 io 实现

2019-07-08 文章 venn
大佬们好:

在开发Scala 的异步io 的时候遇到点问题,Scala 中没有
RichAsyncFunction(Rich类有open方法,可以做初始化操作 ),Scala 版本的
AsyncFunction 与Java 版本的AsyncFunction不是同一个类,所以不能使用Java 的
RichAsyncFunction,请问各位大佬,怎么实现Scala 的RichAsyncFunction ?

详情如下:

Scala中 使用 AsyncDataStream.orderedWait[IN, OUT:
TypeInformation](  input: DataStream[IN],  asyncFunction: AsyncFunction[IN,
OUT], timeout: Long,  timeUnit: TimeUnit, capacity: Int) 方法 中的 参数
AsyncFunction 全路径是 :
org.apache.flink.streaming.api.scala.async.AsyncFunction ,但是该类没有对应
的 RichAsyncFunction 的实现。

 

对应的Java 版本  AsyncDataStream.orderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],  asyncFunction: AsyncFunction[IN, OUT], timeout:
Long,  timeUnit: TimeUnit, capacity: Int) 中参数 AsyncFunction 的全路径是 :
org.apache.flink.streaming.api.functions.asyn. AsyncFunction, 有对应的
RichAsyncFunction的实现(见下图)。

 

 

类:org.apache.flink.streaming.api.functions.async. RichAsyncFunction

 



 

 

非常感谢各位大佬回复