Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the maximum akka framesize

2020-09-29 Thread zheng faaron
Hi,

可以检查一下这个参数是否设置正确,也可以在jobmanager页面上看下是否有这个参数。我之前遇到过类似问题,设置这个参数可以解决问题。

Best,
Faaron Zheng


From: jy l 
Sent: Monday, September 28, 2020 4:57:46 PM
To: user-zh@flink.apache.org 
Subject: Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the 
maximum akka framesize

如果使用了print()等算子,会将上一个task的结果一次全部pull过来,pull时数据超过了akka framesize大小导致。

李加燕  于2020年9月28日周一 下午3:07写道:

> Flink batch 模式消费hdfs上的文件,并做了一个word count
> 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
> java.lang.reflect.UndeclaredThrowableException: null
> at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource)
> ~[?:?]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds
> the maximum akka framesize.
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> ... 28 more
> 我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。
> 请求帮助。


Why setAllVerticesInSameSlotSharingGroupByDefault is set to false in batch mode

2020-09-15 Thread zheng faaron
Hi All,

I find we set AllVerticesInSameSlotSharingGroupByDefault to false in flink 
1.10. It will make batch job request lots of containers. I'm not sure why we 
set it to false directly. I try to set it to true and find the batch job can 
run correctly with a small amount containers. Why  don't we add a configuration 
to let user configure it?

Best,
Faaron Zheng


Re: flink1.9.3 on yarn 提交任务问题

2020-09-08 Thread zheng faaron
Hi,

第一个问题,per-job的方式和session的方式在运行时是不在一个ui中的。

第二个问题,可以配置yarn. container. vcore

Best,
Faaron Zheng


From: 宁吉浩 
Sent: Monday, September 7, 2020 3:23:12 PM
To: user-zh 
Subject: flink1.9.3 on yarn 提交任务问题

我选择用 bin/flink run -m yarn cluster 的方式提交任务;
遇到了两个问题:
1. 这两个任务在一个flink集群ui里可见,甚至和之前的flink-session(测试使用)集群在同一个ui里, 这是正常现象吗?
2. 我知道可以通过并行度和slot的指定来确定 tm的数量,查看下图,两个任务一共占用了yarn的8个容器,请问 cpu这个该如何设定?
ps: 之前使用spark 可以直接设定 执行器核心数量,现在找不到方法,总不能一个tm8个solt,就使用一个cpu吧
[cid:__aliyun159946339265863261]