Re: [Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

2019-01-24 Thread Steven Wu
Hi Andrey,

Weird that I didn't see your reply in my email inbox. My colleague happened
to see it in apache archive :)

nope, we didn't experience it with 1.4 (previous version)

Yes, we did use HA setup.

high-availability: zookeeper
high-availability.zookeeper.quorum: ...
high-availability.zookeeper.path.root: ...
high-availability.zookeeper.path.latch: /leaderlatch
high-availability.zookeeper.path.leader: /leader
high-availability.zookeeper.path.jobgraphs: /jobgraphs
high-availability.zookeeper.path.checkpoints: /checkpoints
recovery.zookeeper.path.checkpoint-counter: /checkpoint-counter
high-availability.storageDir: ...


My colleague (Mark Cho) will provide some additional observations.

Thanks,
Steven


Hi Steven,

Did you not experience this problem with previous Flink release (your
marked topic with 1.7)?

Do you use HA setup?

Without HA setup, the blob data, which belongs to the job, will be
distributed from job master node to all task executors.
Depending on the size of the blob data (jars, user serialised classes etc),
it might overwhelm job master node and network connections.
It can subsequently slow down UI, heart-beating and initialisation of task
executors and produced partitions because task executors contend for the
blob data. When the job is restored, the blob data might be not fetched
because it is already available.

With HA setup, you can configure high-availability.storageDir in DFS and
DFS will serve the blob data.

Otherwise, could you provide the JM log for the further investigation?

Best,
Andrey

On Wed, Jan 23, 2019 at 10:06 PM Steven Wu  wrote:

> When we start a high-parallelism (1,600) job without any
> checkpoint/savepoint, the job struggled to be deployed. After a few
> restarts, it eventually got deployed and was running fine after the initial
> struggle. jobmanager was very busy. Web UI was very slow. I saw these two
> exceptions/failures during the initial failures.
>
> I don't seem to see this issue when starting the same job from an external
> checkpoint. or at least very rarely.
>
> Anyone else experienced similar issue?
>
> Thanks,
> Steven
>
> Exception #1
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> fe55bf158e89cf555be6582e577b9621 timed out.
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)
>
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Exception #2
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
> not found.
>
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)
>
> at java.util.TimerThread.mainLoop(Timer.java:555)
>
> at java.util.TimerThread.run(Timer.java:505)
>
>


Re: [Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

2019-01-24 Thread Andrey Zagrebin
Hi Steven,

Did you not experience this problem with previous Flink release (your
marked topic with 1.7)?

Do you use HA setup?

Without HA setup, the blob data, which belongs to the job, will be
distributed from job master node to all task executors.
Depending on the size of the blob data (jars, user serialised classes etc),
it might overwhelm job master node and network connections.
It can subsequently slow down UI, heart-beating and initialisation of task
executors and produced partitions because task executors contend for the
blob data. When the job is restored, the blob data might be not fetched
because it is already available.

With HA setup, you can configure high-availability.storageDir in DFS and
DFS will serve the blob data.

Otherwise, could you provide the JM log for the further investigation?

Best,
Andrey

On Thu, Jan 24, 2019 at 7:06 AM Steven Wu  wrote:

> When we start a high-parallelism (1,600) job without any
> checkpoint/savepoint, the job struggled to be deployed. After a few
> restarts, it eventually got deployed and was running fine after the initial
> struggle. jobmanager was very busy. Web UI was very slow. I saw these two
> exceptions/failures during the initial failures.
>
> I don't seem to see this issue when starting the same job from an external
> checkpoint. or at least very rarely.
>
> Anyone else experienced similar issue?
>
> Thanks,
> Steven
>
> Exception #1
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> fe55bf158e89cf555be6582e577b9621 timed out.
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)
>
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Exception #2
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
> not found.
>
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)
>
> at java.util.TimerThread.mainLoop(Timer.java:555)
>
> at java.util.TimerThread.run(Timer.java:505)
>
>


[Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

2019-01-23 Thread Steven Wu
When we start a high-parallelism (1,600) job without any
checkpoint/savepoint, the job struggled to be deployed. After a few
restarts, it eventually got deployed and was running fine after the initial
struggle. jobmanager was very busy. Web UI was very slow. I saw these two
exceptions/failures during the initial failures.

I don't seem to see this issue when starting the same job from an external
checkpoint. or at least very rarely.

Anyone else experienced similar issue?

Thanks,
Steven

Exception #1

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
fe55bf158e89cf555be6582e577b9621 timed out.

at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)

at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Exception #2

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
not found.

at
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)

at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)

at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)

at java.util.TimerThread.mainLoop(Timer.java:555)

at java.util.TimerThread.run(Timer.java:505)