回复:flink-1.12 注册udf问题

2020-12-25 文章 kcz
是使用时候没有匹配参数个数问题,已经解决。





-- 原始邮件 --
发件人: kcz <573693...@qq.com>
发送时间: 2020年12月26日 15:24
收件人: user-zh 

flink-1.12 注册udf问题

2020-12-25 文章 kcz
使用了
createTemporarySystemFunctiom来注册udf,使用时候no.match.found.for.function,目前是手机,不太方便粘贴更多信息。

flink12同时使用flink-connector-kafka和flink-sql-connector-kafka会引发类冲突问题

2020-12-25 文章 site
在yarn中部署streaming程序,maven使用依赖

org.apache.flink
flink-connector-kafka-2.2_2.11
1.12.0

在flink的lib目录中有flink-sql-connector-kafka_2.11-1.12.0.jar,因为类冲突问题会引起在yarn中程序部署失败,flink默认类加载机制child-first,改为parent-first也样,类冲突问题可以参考http://apache-flink.147419.n8.nabble.com/Kafka-Consumer-td3475.html
org.apache.kafka.common.KafkaException: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
 is not an instance of org.apache.kafka.common.serialization.Deserializer


不会提ISSUE,还请社区修复

请教flink的处理性能,关于cpu的

2020-12-25 文章 x2009438

想请教大佬们一下。谢谢!

我有一个处理逻辑很简单的job,从kafka消费数据,分组后(分组个数不大,大约2-3万个key)按分钟累加后统计10多个指标的均值,用的是aggregate 
function,结果最后写入kafka。数据量大约6-8W/s,job总体并行度设为10,目前启动3个yarn 
container作为tm每个2G内存,机器均为8C32G的虚机,每台上一个tm进程,目前观察到内存占用低,但%cpu会到200%多。

我是不是可以理解为我这个job总的需要占用
6-7个cpu核的资源,也就是说1C的性能,flink处理的数据大概在1W/s。这样的性能算是正常吗?还是我代码有问题

昨天晚上上游突然数据量翻了个倍,观察到进程cpu占用增长到接近400%。这是因为yarn不限制cpu使用吗?那如果数据量再翻一倍,cpu不就打满了吗,就太糟糕了。


发自我的iPhone

Re: Flink 操作hive 一些疑问

2020-12-25 文章 Rui Li
不太确定是不是能整合到一个job里,你可以试试看用StatementSet能否实现,比如添加两条INSERT语句,一条是写入hive,一条是从hive里查询数据把结果写到其他的表。

On Thu, Dec 24, 2020 at 4:35 PM Jacob <17691150...@163.com> wrote:

> Hi,
>
> 谢谢回复
>
> 对,也可以这样理解,总体分为两部分,先处理流消息,每隔15min写进hive表。然后再做mapreduce处理上步15min的数据。
>
> 目前的现状是:
> 第一步用flink处理,第二步是一个定时job去处理上一步的数据。
>
> 改善计划:
>
> 想整合这两步,都使用flin处理,flink新版本对hive有支持,就不用再使用MapReduce了,现在就是不知道怎样平滑地在同一个Job中执行。
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


Re: Flink catalog+hive问题

2020-12-25 文章 Rui Li
Hi,

你贴的是HDFS的权限控制,那应该就是基于storage的了。可以在HMS端开启验证,这样HiveCatalog去连接HMS的时候会生效。开启方式参考官网:
https://cwiki.apache.org/confluence/display/Hive/Storage+Based+Authorization+in+the+Metastore+Server

On Thu, Dec 24, 2020 at 2:14 PM 19916726683 <19916726...@163.com> wrote:

> 可以参考下这个
>
> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html
> 贴的代码是org.apache.hadoop.hive.io.HdfsUtils 的setFullFileStatus 方法
> Original Message
> Sender:Rui lilirui.fu...@gmail.com
> Recipient:user-zhuser...@flink.apache.org
> Date:Thursday, Dec 24, 2020 11:33
> Subject:Re: Flink catalog+hive问题
>
>
> Hello,
> 你贴的图看不到了。可以贴一下参考的官网链接。hive至少支持三种不同的authorization模式,flink目前对接hive时只有用storage
> based authorization会生效。 On Thu, Dec 24, 2020 at 10:51 AM 19916726683
> 19916726...@163.com wrote:  hive的官网有介绍ACL,如何继承权限关系。源码在Hive- HDFSUtils类中
> 核心代码应该是上面的这点。   Original Message  *Sender:* Rui lilirui.fu...@gmail.com
> *Recipient:* user-zhuser...@flink.apache.org  *Date:* Wednesday, Dec 23,
> 2020 19:41  *Subject:* Re: Flink catalog+hive问题
>  hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1]
> 会生效   [1]
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer
>  On Wed, Dec 23, 2020 at 4:34 PM 19916726683 19916726...@163.com wrote:
>   spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式   Original
> Message   Sender:guaishushu1103@163.comguaishushu1...@163.com
> Recipient:user-zhuser...@flink.apache.org   Date:Wednesday, Dec 23,
> 2020 15:53   Subject:Flink catalog+hive问题   在用flink
>  catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗?
>  guaishushu1...@163.com --  Best regards!  Rui Li   -- Best regards!
> Rui Li



-- 
Best regards!
Rui Li


求问为什么KafkaDynamicSource 在批模式下不能运行

2020-12-25 文章 wxpcc
kafka在我们这边场景上除了用来存放实时流式数据,还会用作临时大数据量的存储,主要用于:

1.
数据同步时,将全量数据同步到一个临时的kafka中,增量数据持续性同步到kafka中,目前我们都使用流模式消费其中的数据,就会有手动停止,或者借助指标等自动停止流式任务
2. 数据恢复时
3. 临时查看某个时间区间的数据

如果批模式 sql能够完成这些事情的话那该多好



--
Sent from: http://apache-flink.147419.n8.nabble.com/

关于iterative Job的检查点问题

2020-12-25 文章 赵一旦
如下
Exception in thread "main" java.lang.UnsupportedOperationException:
Checkpointing is currently not supported by default for iterative jobs, as
we cannot guarantee exactly once semantics. State checkpoints happen
normally, but records in-transit during the snapshot will be lost upon
failure.
The user can force enable state checkpoints with the reduced guarantees by
calling: env.enableCheckpointing(interval,true)
。
Flink当前针对IterativeJob不支持检查点,但可以强制开启。

我需要知道的是,强制开启后,对状态影响多大?无法保证没问题的是哪部分状态,还是对普通流算子的状态也会受到影响呢?


or影响只是iterative的feedback部分?还是什么部分?


Can flink kafka producer dynamically discover new parititons after expansion??

2020-12-25 文章 ??????????
hi??I have a source Kafka and a sink Kafka??when the amount of data processing 
grows??I need to expand Kafka topic's  partition number ,but I don't want 
to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could 
consume the new parititon after I expand the Kafka topic's partition 
number.


but sink kafka don't work like this??


The flink kafka producer get topic's paritition list and cache in 
topicPartitionsMap as showed In Class FlinkKafkaProducer

Can flink kafka producer dynamically discover new parititons after expansion??

2020-12-25 文章 ??????????
hi??I have a source Kafka and a sink Kafka??when the amount of data processing 
grows??I need to expand Kafka topic's  partition number ,but I don't want 
to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could 
consume the new parititon after I expand the Kafka topic's partition 
number.


but sink kafka don't work like this??


The flink kafka producer get topic's paritition list and cache in 
topicPartitionsMap as showed In Class FlinkKafkaProducer

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-25 文章 Storm☀️
在测试环境:
关闭增量chk,全量的state大小大约在:100M左右;
之前开启:我观察了一段时间,膨胀到5G,而且还一直在增长;
sql:
select sum(xx) group by 1 分钟窗口

过期时间设置的为:5-30min



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: Flink读取kafka没有报错也没有数据输出,Kafka消费端有数据,谢谢

2020-12-25 文章 Appleyuchi
解决了,谢谢

















在 2020-12-25 14:48:57,"Leonard Xu"  写道:
>Hi,
>
>> 因为听说executeSql会提交任务,所以把最后一句execute给注销了。 
>
>
>> val result: Table = tEnv.sqlQuery(query)
>> tEnv.toRetractStream[Row](result).print()
>> //tEnv.execute("Flink SQL DDL")
>
>DataStream程序的执行和Table/SQL程序的执行是解耦的,已经通过 tEnv.toRetractStream 转成 DataStrean 
>的程序后,需要调用 bsEnv.execute("test")
>
>如果需要直接用SQL,可以直接:
>tEnv.executeSql(query).print();
>
>转换成datastream后应该类似这样
>val result: Table = tEnv.sqlQuery(query)
>tEnv.toRetractStream[Row](result).print()
>bsEnv.execute("test”)
>
> 
>
>祝好,
>Leonard


Re: Flink TaskManager失败的日志关键词

2020-12-25 文章 r pp
  嗨~ 从flink 的启动 sh 文件里面可以看到,启动java 虚拟机的时候,就设置好 日志文件名了。改了名字,这次的JOB

https://github.com/apache/flink/pull/11839/files

FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-
${id}-${HOSTNAME}"
log="${FLINK_LOG_PREFIX}.log"
log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=fil 
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"  

在启动一个jvm 时,日志配置信息已经写好了
eg:
java  -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456
 -Dlog.file=/

root/flink-1.12.0/log/flink-root-standalonesession-0-iZ0jli08ce7m36qzwgalk4Z.log
。。。

zhuxiaoshang  于2020年12月25日周五 下午4:53写道:

> Hi,
> 一般搜索Exception、Error、Fail之类的吧,如果是TM因为内存超用被kill的话 可以搜索container、kill之类的关键字
>
> > 2020年12月25日 下午1:43,赵一旦  写道:
> >
> > 如题,有人知道关键词吗,每次失败日志太多哦。
> > 显示各种task的cancel等。
> > 最后突然就失败了。。。
> >
> > 目前感觉经常是因为cancel(180s)。导致Task did not exit gracefully within 180 +
> seconds。
> >
> >
> > 此外,大家生产中会修改日志格式和日志文件吗。我调整了之后WEB-UI上那个日志从来没能看过。现在虽然有个日志list,但点了也没效果。
> >
> > 我调整了日志文件名。
>
>


Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-25 文章 zhuxiaoshang


Hi,
文件数是和并发有关的,一个并发一次至少写一个文件,还和文件滚动大小有关。


> 2020年12月25日 下午2:10,amen...@163.com 写道:
> 
> 想请问下,写filesystem的时候依赖checkpoint进行commit,那么做完一次checkpoint的时候可提交的文件数是由并行度parallelism数决定的吗?我发现我的文件提交数都是3个3个的当每次chk结束后。
> 
> 
> 
> 
> 发件人: amen...@163.com
> 发送时间: 2020-12-24 18:47
> 收件人: user-zh
> 主题: Re: Re: Flink-1.11.1流写filesystem分区提交问题
> 一语点醒梦中人,谢谢回复@冯嘉伟
> 
> 因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢
> 
> best,
> amenhub
> 
> 
> 
> 发件人: 冯嘉伟
> 发送时间: 2020-12-24 18:39
> 收件人: user-zh
> 主题: Re: Flink-1.11.1流写filesystem分区提交问题
> 有开启checkpoint吗?
> Part files can be in one of three states:
> In-progress : The part file that is currently being written to is
> in-progress
> Pending : Closed (due to the specified rolling policy) in-progress files
> that are waiting to be committed
> Finished : On successful checkpoints (STREAMING) or at the end of input
> (BATCH) pending files transition to “Finished”
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
> 
>   
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink TaskManager失败的日志关键词

2020-12-25 文章 zhuxiaoshang
Hi,
一般搜索Exception、Error、Fail之类的吧,如果是TM因为内存超用被kill的话 可以搜索container、kill之类的关键字

> 2020年12月25日 下午1:43,赵一旦  写道:
> 
> 如题,有人知道关键词吗,每次失败日志太多哦。
> 显示各种task的cancel等。
> 最后突然就失败了。。。
> 
> 目前感觉经常是因为cancel(180s)。导致Task did not exit gracefully within 180 + seconds。
> 
> 
> 此外,大家生产中会修改日志格式和日志文件吗。我调整了之后WEB-UI上那个日志从来没能看过。现在虽然有个日志list,但点了也没效果。
> 
> 我调整了日志文件名。



checkpoint持久化问题

2020-12-25 文章 chen310
问题:
flink sql中设置了job挂掉后checkpoint保留

execution.checkpointing.externalized-checkpoint-retention
RETAIN_ON_CANCELLATION

并且配置了checkpoint保存到hdfs上

state.backend rocksdb

#增量checkpoint
#state.backend.incremental true
state.checkpoints.dir hdfs:///tmp/flink/checkpoint

flink实际也做了checkpoint,但是用这个路径去hdfs上查询,并不存在对应的路径的目录,好像并不是每次做checkpoint都会持久化到hdfs上,这个是要做啥配置么?让每次checkpoint都保存到磁盘


 



--
Sent from: http://apache-flink.147419.n8.nabble.com/