flink web ui 异常问题
如题,在工作中经常遇到flink任务各种异常,今天我列了下主要的异常,想请大佬们对不同异常的出现场景根据自身经验说下原因、场景、还有可能的优化解决方案。 (1) org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id {...} is no longer reachable. (2) org.apache.flink.util.FlinkException: The TaskExecutor is shutting down. (3) org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager '10.35.213.153/10.35.213.153:2085'. This indicates that the remote task manager was lost. (4) org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection timed out (connection to '10.35.116.170/10.35.116.170:2031') (5) org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer (6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id {...} timed out.
Flink SQL suport tfrecord format
hi,when I write a sql like this: String sqlCreate = "CREATE TABLE fs_table (\n" + " `examplestr` bytes\n" + ") WITH (\n" + " 'connector'='filesystem',\n" + " 'format'='raw',\n" + " 'path'='/tmp/zhangying480'\n" + ")"; I get an error like this,which means my tfrecord format is wrong,but it is correct: at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.IOException: Length header crc32 checking failed: -1837507072 != -252953760, length = 323850 at com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) I read the source code then have a result: public class SerializationSchemaAdapter implements Encoder { private static final long serialVersionUID = 1L; static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0]; @Override public void encode(RowData element, OutputStream stream) throws IOException { checkOpened(); stream.write(serializationSchema.serialize(element)); stream.write(LINE_DELIMITER); write line_delimiter will cause the incorrect result,so I realize a TFRecordSerializationSchemaAdapter and I override the FileSystemTableSink,then I fix the problem So far,I only suport filesystemsink with tfrecord format,do you think It is necessary to support other system?
Re: flink hybrid source问题
你好,hybrid source 现在需要基于 FLIP-27 source 来实现(如:FileSource, KafkaSource),对于非 FLIP-27 source 需要做一些修改后才可以使用。如果想参与 hybird source 的扩展,可以在 slack 中加入flink社群,并发起讨论。 关于 source 相关的文档,可以查看官网和 flip 设计和讨论页面( https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source )( https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface )。希望能帮到你。 casel.chen 于2022年9月19日周一 19:42写道: > 我有一个flink实时计算场景是需要先从MaxCompute读取一张表的存量数据,再从相应的kafka topic读取增量数据,一并进行计算处理。 > 看了一下需要用到hybrid source,目前最新flink社区版提供了Kafka/Hive/File > Source,其他数据源的source是需要自己开发吗?社区有没有一个贡献source的地方? > 有没有介绍如何自定义基于新版source架构的source文章或博客呢?谢谢!
Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?
你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1. 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率; 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有 task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。 Xuyang 于2022年9月9日周五 20:35写道: > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。 > > > > > > > > -- > > Best! > Xuyang > > > > > > 在 2022-09-09 19:04:27,"郑 致远" 写道: > >各位大佬好 > >请教下, > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 这么设计的考虑是啥呢? >
Flink SQL suport tfrecord format
hi,when I write a sql like this: String sqlCreate = "CREATE TABLE fs_table (\n" + " `examplestr` bytes\n" + ") WITH (\n" + " 'connector'='filesystem',\n" + " 'format'='raw',\n" + " 'path'='/tmp/zhangying480'\n" + ")"; I get an error like this,which means my tfrecord format is wrong,but it is correct: at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.IOException: Length header crc32 checking failed: -1837507072 != -252953760, length = 323850 at com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) I read the source code then have a result: public class SerializationSchemaAdapter implements Encoder { private static final long serialVersionUID = 1L; static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0]; @Override public void encode(RowData element, OutputStream stream) throws IOException { checkOpened(); stream.write(serializationSchema.serialize(element)); stream.write(LINE_DELIMITER); write line_delimiter will cause the incorrect result,so I realize a TFRecordSerializationSchemaAdapter and I override the FileSystemTableSink,then I fix the problem So far,I only suport filesystemsink with tfrecord format,do you think It is necessary to support other system?
Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?
Hi 我个人觉得简单的说flink数据传输是pull模型可能会有歧义,一般来讲大家理解的两个模型的执行流程如下 1. push模型 上下游计算任务将初始化网络连接后,上游计算任务直接通过连接不断向下游"push"数据 2. pull模型 上下游计算任务初始化网络连接后,下游计算任务根据自己的计算进度,轮询向上游发送请求“pull”数据,执行下一轮计算 在flink里,上下游交互流程主要分为几个步骤 1. 上游计算任务所在的TM创建一个Netty Server 2. 下游计算任务启动时通过Netty Client跟上游创建连接 3. 下游计算任务向上游发送一个partition request请求,上游根据request请求创建数据reader,通过reader不断读取数据并通过连接发送数据 4. 上下游计算任务分别有自己的内存池子,用于流控,大概流程如下 a) 下游计算任务根据数据消费内存池子情况,不定期向上游计算任务更新授信(credit) b) 上游计算任务根据接收到的credit消息,更新本地管理的授信大小 c) 上游计算任务根据本地授信大小不断向下游计算任务发送数据 通过这种方式,在资源足够的情况下,可以保证数据传输是完全流式的,这跟传统的pull模型不同,可能更像是支持授信流控机制的push模型 On Wed, Sep 21, 2022 at 9:43 AM yh z wrote: > 你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1. > 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率; > 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有 > task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。 > > Xuyang 于2022年9月9日周五 20:35写道: > > > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。 > > > > > > > > > > > > > > > > -- > > > > Best! > > Xuyang > > > > > > > > > > > > 在 2022-09-09 19:04:27,"郑 致远" 写道: > > >各位大佬好 > > >请教下, > > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 这么设计的考虑是啥呢? > > >
Re: 某作业计算算子处于busy状态
flink内存泄漏有什么排查的指标或者工具吗? 比如大致定位泄漏的位置之类的。 > 在 2022年9月19日,下午5:41,yidan zhao 写道: > > 那你代码检查下有没有内存泄露呢。 > > 杨扬 于2022年9月19日周一 11:21写道: >> >> 还有一个现象,观察到 >> taskHeap内存占用在逐步升高,作业刚启动的时候占用在10%左右,一周后增加至25%左右,两周后增加至50%左右,上述指的是GC后观察到的内存占用值。两周后计算算子几乎一直100%busy状态,端到端延迟已经达到了10s左右,作业已经不可用需要重启了。 >> >> >> >> >>> 在 2022年9月15日,下午8:58,yidan zhao 写道: >>> >>> 本身低延迟一定程度上就是靠“资源低利用率”实现的。资源高利用率情况,就是尽可能满负荷够用就行的意思。 >>> >>> yidan zhao 于2022年9月15日周四 20:57写道: 资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。 杨扬 于2022年9月15日周四 20:02写道: > > 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧? > > > > >> 在 2022年9月15日,下午7:27,yidan zhao 写道: >> >> busy那就提升并发度看看效果? >> >> 杨扬 mailto:yangya...@cupdata.com>> 于2022年9月15日周四 >> 14:51写道: >> 各位好! >> 目前有一flink作业,大致分为3个阶段: >> 读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)-> >> 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。 >> >> 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。 >> >> >> 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题 >> >> >> >> === >> 此邮件已由 Deep Discovery Email Inspector 进行了分析。 > >>> >>> === >>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。 >> > > === > 此邮件已由 Deep Discovery Email Inspector 进行了分析。