Hi Steven,

Did you not experience this problem with previous Flink release (your
marked topic with 1.7)?

Do you use HA setup?

Without HA setup, the blob data, which belongs to the job, will be
distributed from job master node to all task executors.
Depending on the size of the blob data (jars, user serialised classes etc),
it might overwhelm job master node and network connections.
It can subsequently slow down UI, heart-beating and initialisation of task
executors and produced partitions because task executors contend for the
blob data. When the job is restored, the blob data might be not fetched
because it is already available.

With HA setup, you can configure high-availability.storageDir in DFS and
DFS will serve the blob data.

Otherwise, could you provide the JM log for the further investigation?

Best,
Andrey

On Thu, Jan 24, 2019 at 7:06 AM Steven Wu <stevenz...@gmail.com> wrote:

> When we start a high-parallelism (1,600) job without any
> checkpoint/savepoint, the job struggled to be deployed. After a few
> restarts, it eventually got deployed and was running fine after the initial
> struggle. jobmanager was very busy. Web UI was very slow. I saw these two
> exceptions/failures during the initial failures.
>
> I don't seem to see this issue when starting the same job from an external
> checkpoint. or at least very rarely.
>
> Anyone else experienced similar issue?
>
> Thanks,
> Steven
>
> Exception #1
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> fe55bf158e89cf555be6582e577b9621 timed out.
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)
>
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Exception #2
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
> not found.
>
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)
>
> at java.util.TimerThread.mainLoop(Timer.java:555)
>
> at java.util.TimerThread.run(Timer.java:505)
>
>

Reply via email to