flink web ui 异常问题

2022-09-20 Thread yidan zhao
如题,在工作中经常遇到flink任务各种异常,今天我列了下主要的异常,想请大佬们对不同异常的出现场景根据自身经验说下原因、场景、还有可能的优化解决方案。

(1) org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager
with id {...} is no longer reachable.
(2) org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
(3) 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Lost connection to task manager '10.35.213.153/10.35.213.153:2085'.
This indicates that the remote task manager was lost.
(4) org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection timed out (connection to
'10.35.116.170/10.35.116.170:2031')
(5) 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
writeAddress(..) failed: Connection reset by peer
(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id {...} timed out.


Flink SQL suport tfrecord format

2022-09-20 Thread 张颖
hi,when I write a sql like this:




String sqlCreate = "CREATE TABLE fs_table (\n" +
"  `examplestr` bytes\n" +
")  WITH (\n" +
"  'connector'='filesystem',\n" +
"  'format'='raw',\n" +
"  'path'='/tmp/zhangying480'\n" +
")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder {

private static final long serialVersionUID = 1L;

static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


  
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?

















Re: flink hybrid source问题

2022-09-20 Thread yh z
你好,hybrid source 现在需要基于 FLIP-27 source 来实现(如:FileSource, KafkaSource),对于非
FLIP-27 source 需要做一些修改后才可以使用。如果想参与 hybird source 的扩展,可以在 slack
中加入flink社群,并发起讨论。 关于 source 相关的文档,可以查看官网和 flip 设计和讨论页面(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
)(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
)。希望能帮到你。

casel.chen  于2022年9月19日周一 19:42写道:

> 我有一个flink实时计算场景是需要先从MaxCompute读取一张表的存量数据,再从相应的kafka topic读取增量数据,一并进行计算处理。
> 看了一下需要用到hybrid source,目前最新flink社区版提供了Kafka/Hive/File
> Source,其他数据源的source是需要自己开发吗?社区有没有一个贡献source的地方?
> 有没有介绍如何自定义基于新版source架构的source文章或博客呢?谢谢!


Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-20 Thread yh z
你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有
task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。

Xuyang  于2022年9月9日周五 20:35写道:

> Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-09-09 19:04:27,"郑 致远"  写道:
> >各位大佬好
> >请教下,
> >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
>


Flink SQL suport tfrecord format

2022-09-20 Thread 张颖
hi,when I write a sql like this:




String sqlCreate = "CREATE TABLE fs_table (\n" +
"  `examplestr` bytes\n" +
")  WITH (\n" +
"  'connector'='filesystem',\n" +
"  'format'='raw',\n" +
"  'path'='/tmp/zhangying480'\n" +
")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder {

private static final long serialVersionUID = 1L;

static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


  
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?

















Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-20 Thread Shammon FY
Hi
我个人觉得简单的说flink数据传输是pull模型可能会有歧义,一般来讲大家理解的两个模型的执行流程如下
1. push模型
上下游计算任务将初始化网络连接后,上游计算任务直接通过连接不断向下游"push"数据
2. pull模型
上下游计算任务初始化网络连接后,下游计算任务根据自己的计算进度,轮询向上游发送请求“pull”数据,执行下一轮计算

在flink里,上下游交互流程主要分为几个步骤
1. 上游计算任务所在的TM创建一个Netty Server
2. 下游计算任务启动时通过Netty Client跟上游创建连接
3. 下游计算任务向上游发送一个partition
request请求,上游根据request请求创建数据reader,通过reader不断读取数据并通过连接发送数据
4. 上下游计算任务分别有自己的内存池子,用于流控,大概流程如下
a) 下游计算任务根据数据消费内存池子情况,不定期向上游计算任务更新授信(credit)
b) 上游计算任务根据接收到的credit消息,更新本地管理的授信大小
c) 上游计算任务根据本地授信大小不断向下游计算任务发送数据

通过这种方式,在资源足够的情况下,可以保证数据传输是完全流式的,这跟传统的pull模型不同,可能更像是支持授信流控机制的push模型

On Wed, Sep 21, 2022 at 9:43 AM yh z  wrote:

> 你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
> 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
> 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有
> task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。
>
> Xuyang  于2022年9月9日周五 20:35写道:
>
> > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >
> > 在 2022-09-09 19:04:27,"郑 致远"  写道:
> > >各位大佬好
> > >请教下,
> > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
> >
>


Re: 某作业计算算子处于busy状态

2022-09-20 Thread 杨扬
flink内存泄漏有什么排查的指标或者工具吗?
比如大致定位泄漏的位置之类的。





> 在 2022年9月19日,下午5:41,yidan zhao  写道:
> 
> 那你代码检查下有没有内存泄露呢。
> 
> 杨扬  于2022年9月19日周一 11:21写道:
>> 
>> 还有一个现象,观察到 
>> taskHeap内存占用在逐步升高,作业刚启动的时候占用在10%左右,一周后增加至25%左右,两周后增加至50%左右,上述指的是GC后观察到的内存占用值。两周后计算算子几乎一直100%busy状态,端到端延迟已经达到了10s左右,作业已经不可用需要重启了。
>> 
>> 
>> 
>> 
>>> 在 2022年9月15日,下午8:58,yidan zhao  写道:
>>> 
>>> 本身低延迟一定程度上就是靠“资源低利用率”实现的。资源高利用率情况,就是尽可能满负荷够用就行的意思。
>>> 
>>> yidan zhao  于2022年9月15日周四 20:57写道:
 
 资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。
 
 杨扬  于2022年9月15日周四 20:02写道:
> 
> 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
> 
> 
> 
> 
>> 在 2022年9月15日,下午7:27,yidan zhao  写道:
>> 
>> busy那就提升并发度看看效果?
>> 
>> 杨扬 mailto:yangya...@cupdata.com>> 于2022年9月15日周四 
>> 14:51写道:
>> 各位好!
>> 目前有一flink作业,大致分为3个阶段:
>> 读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)-> 
>> 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
>> 
>> 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
>> 
>> 
>> 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题
>> 
>> 
>> 
>> ===
>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
> 
>>> 
>>> ===
>>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>> 
> 
> === 
> 此邮件已由 Deep Discovery Email Inspector 进行了分析。