Flink-1.15 HiveDynamicTableFactory 取Catalog使用

2022-09-19 文章 yanggang_it_job
当前使用HiveDynamicTableFactory需要先声明HiveCatalog才可以使用。
请问能否把HiveDynamicTableFactory直接作为一个connector使用,或者说需要怎么调整才可以直接使用?

Flink-SQL数据倾斜处理

2020-12-14 文章 yanggang_it_job
hello,通过FlinkSQL实现了一个简单的业务:Kafka to hive
但是任务不定期报错,某个TM异常挂掉,经排查可以得到如下日志
Direct buffer memory. The direct out-of-memory error has occurred. This can 
mean two things: either job(s) require(s) a larger size of JVM direct mOpt>  or 
there is a direct memory leak. The direct memory can be allocated by user code 
or some of its dependencies. In this case 
'taskmanager.memory.task.off-heap.size' configuration option should be 
increased. Flink framework and its dependencies also consume the direct memory, 
mostly for network communication. The most of network memory is managed by 
Flink and should not result in out-of-memory error. In certain special cases, 
in particular for jobs with high parallelism, the framework may require more 
direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies which has to be investigated and fixed. 
The task executor has to be shutdown...


可以通过两个参数进行调节,但是感觉这不是根本原因,现在怀疑是数据倾斜导致,为什么会任务是数据倾斜呢?,请看下图:
1.对内存使用曲线:

可以得出每个TM的堆内存(HeapMemory)使用相差很大。


2.直接内存曲线图

可以得出每个TM的直接内存(DirectMemory)使用相差很大。

问1:如果是数据倾斜导致的异常,那么在FlinkSQL场景中怎么处理?
问2:如果不是数据倾斜导致的,那么是什么原因导致的?解决方案是什么?

Best to you !!!

checkpoint失败讨论

2020-06-01 文章 yanggang_it_job
最近多个以rocksdb作为状态后端,hdfs作为远程文件系统的任务,频繁报错,这个报错有以下特征
1、报错之前这些任务都平稳运行,突然在某一天报错
2、当发现此类错误的时候,多个任务也会因相同的报错而导致checkpoint失败


报错信息如下
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/stream/flink-checkpoints/19523bf083346eb80b409167e9b91b53/chk-43396/cef72b90-8492-4b09-8d1b-384b0ebe5768
 could only be replicated to 0 nodes instead of minReplication (=1). There are 
8 datanode(s) running and no node(s) are excluded in this operation.
at 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1723)


辛苦大家看看
谢谢

Re:Prometheus Pushgateway 监控 Flink 问题

2020-05-12 文章 yanggang_it_job
HI 佳宸
跟你介绍下这几个参数的目的
metrics.reporter.promgateway.deleteOnShutdown:这个参数用于控制,当通过stop或者cancel下线一个任务的时候,会把pushgateway内存中缓存的指标进行清理,如果通过yarn
 kill的方式就不会清除
metrics.reporter.promgateway.randomJobNameSuffix:这个参数用于控制在我们定义的jobName后面加一个随机后缀以区别相同任务的不同container的metric_name,否则会出现覆盖写,也就是你描述的那样,指标不全的问题。原理是:当一个任务启动之后至少会有两个container(一个JM和一个TM),每个container都会往pushgateway推送指标,如果不设置这个参数为true的话,会用同一个jobName进行指标推送,那么此时后一个推送的指标就会前一个指标,就会产生一会是JM的指标,一会是TM的指标,所以要加上这个参数,那么每个container的就会不一样,这样就不会覆盖。

祝好
杨纲

















在 2020-05-12 18:25:10,"李佳宸"  写道:
>hi,大家好
>
>我在使用Prometheus Pushgateway 监控
>Flink时,metrics.reporter.promgateway.deleteOnShutdown:
>true 这一配置失效,
>Flink集群关闭时,pushgateway中仍然存有metrics数据
>reporter相关的全部配置为:
>
>metrics.reporter.promgateway.class:
>org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
>
>metrics.reporter.promgateway.host: localhost
>
>metrics.reporter.promgateway.port: 9091
>
>metrics.reporter.promgateway.jobName: myJob
>
>metrics.reporter.promgateway.randomJobNameSuffix: *true*
>
>metrics.reporter.promgateway.deleteOnShutdown: *true*
>
>
>Flink版本为1.9.1, pushgateway版本0.9 和1.2都尝试过,一样的问题。
>
>
>不知道这是不是bug,
>
>有谁有成功的案例么?
>
>谢谢


Re:Re: 任务假死

2020-04-25 文章 yanggang_it_job
感谢您的回复,这个问题和您刚才给我的场景有些相似,但还是有些许差异。
刚才试了几种方式,图片好像都无法访问。
下面我详细介绍下异常情况
1、我的任务是从三个kafka读取,然后通过onGroup实现left 
join语义,然后定义了一个滑动窗口(600,10),最后通过一个CoGroupFunction进行处理具体的数据
2、异常出现在其中一个CoGruopFunction(Window(TumblingEventTimeWindows(60), 
EventTimeTrigger, CoGroupWindowFunction) (15/200))报OOM,异常栈如下
  java.lang.OutOfMemoryError: unable to create newnative thread
at java.lang.Thread.start0(NativeMethod)
at java.lang.Thread.start(Thread.java:717)
at 
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1237)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:137)
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
   
3、除了这个算子vertice为FAILED,其他vertice都为CANCELED,JobManager状态为RUNNING


正常情况下出现这个错,JM会找一台合适的机器重新把TM启起来或者多次尝试后,任务退出。
但是现在任务的运行状态为RUNNING,虽然为RUNNING但是也不写入数据到下游存储。







thanks


在 2020-04-26 11:01:04,"Zhefu PENG"  写道:
>图好像挂了看不到。是不是和这两个场景描述比较相似
>
>[1] http://apache-flink.147419.n8.nabble.com/flink-kafka-td2386.html
>[2]  http://apache-flink.147419.n8.nabble.com/Flink-Kafka-td2390.html
>On Sun, Apr 26, 2020 at 10:58 yanggang_it_job 
>wrote:
>
>> 1、Flink-UI截图
>> 我任务的启动配置是 -p 200 -ys 5,也就是说会有40个TM运行,之前正常运行的时候确实是40个TM,而现在只有1个TM在运行;
>> 同时通过观察数据的同步时间,任务已经在两天前停止写入数据了,但是查看任务的状态却是RUNNING;
>> 我的理解是当tm申请失败,那么当前任务就会退出并把状态设置为FINISHED,但是真实情况却是上面描述那样。
>> 请问为什么会出现这种情况呢?
>>
>> thanks
>>
>>
>>
>>
>>
>>


任务假死

2020-04-25 文章 yanggang_it_job
1、Flink-UI截图
我任务的启动配置是 -p 200 -ys 5,也就是说会有40个TM运行,之前正常运行的时候确实是40个TM,而现在只有1个TM在运行;
同时通过观察数据的同步时间,任务已经在两天前停止写入数据了,但是查看任务的状态却是RUNNING;
我的理解是当tm申请失败,那么当前任务就会退出并把状态设置为FINISHED,但是真实情况却是上面描述那样。
请问为什么会出现这种情况呢?


thanks





关于使用RocksDBStateBackend TTL 配置的问题

2020-04-02 文章 yanggang_it_job
Hi:
   
我们现在启用state.backend.rocksdb.ttl.compaction.filter.enabled进行rocksdb的有效期设置,但效果并不是那么理想。
   同时我也有以下问题想不明白:
   1、如果rocksdb在compact的时候有些state并没有被compact到,是否就意味着就算这些state已经过期也不会被删除?
   2、目前flink的ttl策略只有OnCreateAndWrite和OnReadAndWrite两种策略,是否有那种不需要刷新,到了TTL时间就自动清除。
 否则就会出现state一直在刷新导致永远无法删除,最终导致磁盘打满


   目前我能想到的方案是,另外写一个定时任务根据配置去清除过期state。
   请问大家还有其他更好的方案吗?

NetworkBufferPool的使用

2020-03-26 文章 yanggang_it_job
Hi:

观察flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments的值发现这个值很大,也就是说NetworkBufferPool还很充裕,可我的任务还是发生了背压告警。
请问各位大佬这是为什么呢?


如何提升任务cpu使用率

2020-03-24 文章 yanggang_it_job
hi:
   背景介绍,现在集群的剩余核数不多,就去梳理了一些大任务。
   
通过PromSQL:max(flink_taskmanager_Status_JVM_CPU_Load{job_name={job_name}})获取指定任务的cpu使用率,
   发现任务的cpu使用率普遍较低,一个slot为10的container,使用率大多小于6%。
   
   然后我测试中我降低container里面的slot数,发现cpu使用率并没有线性增加,同理我增大slot数也没有线性减少。
   
   我是不是测试的有问题呢?或者有什么相关思路吗?




  
 

pushgateway内存异常

2020-03-19 文章 yanggang_it_job
Hi:

向大家请教一个使用org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter推送指标到
pushgateway时,pushgateway内存使用异常的问题,具体异常如下
 1、实际我们的内存使用在8G左右,但是pushgateway的内存一直在35G左右波动
 2、pushgateway曲线波动较大,不是一条平稳的曲线,会有8G左右的波动
 
希望大家帮忙看看导致以上问题的原因,谢谢...

Re:回复:窗口去重

2019-12-11 文章 yanggang_it_job
我觉得可以这样处理:1:首先把你的stream流注册为表(不管是一个还是多个stream)2:然后对这个表使用FLINKSQL进行业务表达3:最后使用FLINK
 
SQL提供的开窗函数指定想要去重的字段注意:控制state的大小参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication
在 2019-12-11 15:53:00,"Jimmy Wong"  写道:
>属于不同的window,是window内去重,window间不去重
>
>
>| |
>Jimmy Wong
>|
>|
>wangzmk...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2019年12月11日 12:08,梁溪 写道:
>去重了为什么还会有两个2
>
>
>
>
>| |
>梁溪
>|
>|
>邮箱:lx_la...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2019年12月11日 11:19,Jimmy Wong 写道:
>Hi, Yuan,Youjun 谢谢。 你这种方案是 SQL 的角度吧,如果用 DataStream 算子要怎么处理呢?
>
>
>| |
>Jimmy Wong
>|
>|
>wangzmk...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2019年12月11日 09:04,Yuan,Youjun 写道:
>第一种情况,用firstvalue这种聚合函数; 第二种情况,用min聚合函数,然后group by id,是不是就是你要的结果?
>
>-邮件原件-
>发件人: Jimmy Wong 
>发送时间: Tuesday, December 10, 2019 4:40 PM
>收件人: user-zh@flink.apache.org
>主题: 窗口去重
>
>Hi,All:
>请教一个问题,现在有个实时场景:需要对每 5 分钟内数据进行去重,然后 Sink。
>比如:
>数据
>{ts: 2019-12-10 16:24:00 id: 1}
>{ts: 2019-12-10 16:22:00 id: 1}
>{ts: 2019-12-10 16:23:00 id: 2}
>{ts: 2019-12-10 16:21:00 id: 1}
>{ts: 2019-12-10 16:29:00 id: 2}
>{ts: 2019-12-10 16:27:00 id: 3}
>{ts: 2019-12-10 16:26:00 id: 2}
>
>
>第一种情景,不考虑时间去重,结果如下:
>{ts: 2019-12-10 16:24:00 id: 1}
>{ts: 2019-12-10 16:23:00 id: 2}
>{ts: 2019-12-10 16:29:00 id: 2}
>{ts: 2019-12-10 16:27:00 id: 3}
>
>
>第二种情景,考虑时间去重,结果如下:
>{ts: 2019-12-10 16:21:00 id: 1}
>{ts: 2019-12-10 16:23:00 id: 2}
>{ts: 2019-12-10 16:26:00 id: 2}
>{ts: 2019-12-10 16:27:00 id: 3}
>
>
>请教下,对于上面两种情景,分别有什么高效实时的解决方案么, 谢谢?我想了一下用 5min 窗口,和 ProcessWindowFunction 
>可以解决,但是 ProcessWindowFunction 要缓存 5min 的窗口数据,但是有延迟。
>
>
>
>
>| |
>Jimmy Wong
>|
>|
>wangzmk...@163.com
>|
>签名由网易邮箱大师定制
>


Flink StreamingFileSink.forBulkFormat to HDFS

2019-10-13 文章 yanggang_it_job
消费Kafka数据到HDFS,是否能支持ORC格式的Hive表


1. 保证EXACTLY_ONCE
2. 支持ORC格式、Snappy、ZLIB压缩

文件重命名

2019-10-08 文章 yanggang_it_job
Dear All


Flink 1.9.0


1. 使用StreamingFileSink 消费kafka数据到HDFS
2. 开启了EXACTLY_ONCE


写入hdfs的文件都是
part-{parallel-task}-{count}
这种格式


如何重命名啊?


Best

Checkpoint使用

2019-08-27 文章 yanggang_it_job
关于flink从checkpoint的问题:
   1、如果我的并行度发生了改变,怎么从checkpoint启动?
   2、是否可以动态设置checkpoint触发时间?