Hi!

Adding Ufuk and Till to this...

You are right, these issues should not compromise HA. Is it possible that
you share the logs to diagnose what the issue was?

@Till, Ufuk: Can it be that the ZooKeeper client gave up for good trying to
connect to ZooKeeper after a certain time?

Stephan

On Tue, Jan 24, 2017 at 6:16 PM, Shannon Carey <sca...@expedia.com> wrote:

> I am running 1.1.4. It does look like there were problems with the
> connection to Zookeeper due to overworking the network. I'm not sure what I
> can do about it (not sure what happens when a JM loses leadership), but
> ideally a cluster-wide failure would not result in losing all the jobs,
> checkpoints, etc.
>
> -Shannon
>
> From: Stephan Ewen <se...@apache.org>
> Date: Tuesday, January 24, 2017 at 8:07 AM
>
> To: <user@flink.apache.org>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> Hi!
>
> I think there were some issues in the HA recovery of 1.1.3 that were fixed
> in 1.1.4 and 1.2.0.
> What version are you running on?
>
> Stephan
>
>
> On Sat, Jan 21, 2017 at 4:58 PM, Ufuk Celebi <u...@apache.org> wrote:
>
>> Hey Shannon,
>>
>> the final truth for recovery is in ZooKeeper. Can you check whether
>> there also references available in ZooKeeper? Do you have the job
>> manager logs available from after the failure? On recovery, Flink
>> checks ZooKeeper for entries. These point to files in the storageDir.
>> It could have happened that these got out of sync, e.g. entries
>> deleted from ZK but not from the storageDir.
>>
>> Maybe the loss of the task managers can also be explained by a
>> connection loss to ZK or something. When a JM looses leadership, the
>> TMs cancel all tasks and disconnect from the JM. Here again, we would
>> need to look into the logs.
>>
>> Sorry for the bad experience lately :-(
>>
>> – Ufuk
>>
>>
>> On Sat, Jan 21, 2017 at 4:31 AM, Shannon Carey <sca...@expedia.com>
>> wrote:
>> > In fact, I can see all my job jar blobs and some checkpoint & job graph
>> > files in my configured "recovery.zookeeper.storageDir"… however for
>> some
>> > reason it didn't get restored when my new Flink cluster started up.
>> >
>> >
>> > From: Shannon Carey <sca...@expedia.com>
>> > Date: Friday, January 20, 2017 at 9:14 PM
>> > To: "user@flink.apache.org" <user@flink.apache.org>
>> >
>> > Subject: Re: Rapidly failing job eventually causes "Not enough free
>> slots"
>> >
>> > I recently added some better visibility into the metrics we're gathering
>> > from Flink. My Flink cluster died again due to the "Not enough free
>> slots
>> > available to run the job" problem, and this time I can see that the
>> number
>> > of registered task managers went down from 11 to 7, then waffled and
>> only
>> > ever got back up to 10 (one short of the requested amount) before
>> dropping
>> > to 0 just before the cluster died. This would seem to explain why there
>> > weren't sufficient slots (given that we were probably using them all or
>> > nearly all)… The metric of "running jobs" went down from 5 to 3 during
>> this
>> > time period as well. So the problem seems to be loss of taskmanagers
>> due to
>> > errors (not yet sure what exactly as I have to delve into logs).
>> >
>> > The other thing I have to figure out is restoring the jobs… I thought
>> that
>> > HA would start the jobs back up again if Flink died & I re-launched it,
>> but
>> > that doesn't appear to be the case.
>> >
>> >
>> > From: Stephan Ewen <se...@apache.org>
>> > Date: Thursday, January 5, 2017 at 7:52 AM
>> > To: <user@flink.apache.org>
>> > Subject: Re: Rapidly failing job eventually causes "Not enough free
>> slots"
>> >
>> > Another thought on the container failure:
>> >
>> > in 1.1, the user code is loaded dynamically whenever a Task is started.
>> That
>> > means that on every task restart the code is reloaded. For that to work
>> > proper, class unloading needs to happen, or the permgen will eventually
>> > overflow.
>> >
>> > It can happen that class unloading is prevented if the user functions do
>> > leave references around as "GC roots", which may be threads, or
>> references
>> > in registries, etc.
>> >
>> > In Flink 1.2, YARN will put the user code into the application
>> classpath, so
>> > code needs not be reloaded on every restart. That should solve that
>> issue.
>> > To "simulate" that behavior in Flink 1.1, put your application code jars
>> > into the "lib" folder
>> >
>> > Best,
>> > Stephan
>> >
>> >
>> > On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yuri.ruc...@gmail.com>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> I've faced a similar issue recently. Hope sharing my findings will
>> help.
>> >> The problem can be split into 2 parts:
>> >>
>> >> Source of container failures
>> >> The logs you provided indicate that YARN kills its containers for
>> >> exceeding memory limits. Important point here is that memory limit =
>> JVM
>> >> heap memory + off-heap memory. So if off-heap memory usage is high,
>> YARN may
>> >> kill containers despite JVM heap consumption is fine. To solve this
>> issue,
>> >> Flink reserves a share of container memory for off-heap memory. How
>> much
>> >> will be reserved is controlled by yarn.heap-cutoff-ratio and
>> >> yarn.heap-cutoff-min configuration. By default 25% of the requested
>> >> container memory will be reserved for off-heap. This is seems to be a
>> good
>> >> start, but one should experiment and tune to meet their job specifics.
>> >>
>> >> It's also worthwhile to figure out who consumes off-heap memory. Is it
>> >> Flink managed memory moved off heap (taskmanager.memory.off-heap =
>> true)? Is
>> >> it some external library allocating something off heap? Is it your own
>> code?
>> >>
>> >> How Flink handles task manager failures
>> >> Whenever a task manager fails, the Flink jobmanager decides whether it
>> >> should:
>> >> - reallocate failed task manager container
>> >> - fail application entirely
>> >> These decisions can be guided by certain configuration
>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
>> >> With default settings, job manager does reallocate task manager
>> containers
>> >> up to the point when N failures have been observed, where N is the
>> number of
>> >> requested task managers. After that the application is stopped.
>> >>
>> >> According to the logs, you have a finite number in
>> >> yarn.maximum-failed-containers (11, as I can see from the logs - this
>> may be
>> >> set by Flink if not provided explicitly). On 12th container restart,
>> >> jobmanager gives up and the application stops. I'm not sure why it
>> keeps
>> >> reporting not enough slots after that point. In my experience this may
>> >> happen when job eats up all the available slots, so that after
>> container
>> >> failure its tasks cannot be restarted in other (live) containers. But I
>> >> believe once the decision to stop the application is made, there
>> should not
>> >> be any further attempts to restart the job, hence no logs like those.
>> >> Hopefully, someone else will explain this to us :)
>> >>
>> >> In my case I made jobmanager restart containers infinitely by setting
>> >> yarn.maximum-failed-containers = -1, so that taskmanager failure never
>> >> results in application death. Note this is unlikely a good choice for a
>> >> batch job.
>> >>
>> >> Regards,
>> >> Yury
>> >>
>> >> 2017-01-05 3:21 GMT+03:00 Shannon Carey <sca...@expedia.com>:
>> >>>
>> >>> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem
>> twice
>> >>> and I'm wondering if anyone has some insight about it.
>> >>>
>> >>> In both cases, we deployed a job that fails very frequently (within
>> >>> 15s-1m of launch). Eventually, the Flink cluster dies.
>> >>>
>> >>> The sequence of events looks something like this:
>> >>>
>> >>> bad job is launched
>> >>> bad job fails & is restarted many times (I didn't have the
>> "failure-rate"
>> >>> restart strategy configuration right)
>> >>> Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
>> (SIGTERM
>> >>> handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>> >>> At this point, the YARN resource manager also logs the container
>> failure
>> >>> More logs: Container
>> >>> ResourceID{resourceId='container_1481658997383_0003_01_000013'}
>> failed. Exit
>> >>> status: Pmem limit exceeded (-104)
>> >>> Diagnostics for container
>> >>> ResourceID{resourceId='container_1481658997383_0003_01_000013'} in
>> state
>> >>> COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>> >>> [pid=21246,containerID=container_1481658997383_0003_01_000013] is
>> running
>> >>> beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB
>> physical
>> >>> memory used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
>> >>> Container killed on request. Exit code is 143
>> >>> Container exited with a non-zero exit code 143
>> >>> Total number of failed containers so far: 12
>> >>> Stopping YARN session because the number of failed containers (12)
>> >>> exceeded the maximum failed containers (11). This number is
>> controlled by
>> >>> the 'yarn.maximum-failed-containers' configuration setting. By
>> default its
>> >>> the number of requested containers.
>> >>> From here onward, the logs repeatedly show that jobs fail to restart
>> due
>> >>> to
>> >>> "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAva
>> ilableException:
>> >>> Not enough free slots available to run the job. You can decrease the
>> >>> operator parallelism or increase the number of slots per TaskManager
>> in the
>> >>> configuration. Task to schedule: < Attempt #68 (Source: …) @
>> (unassigned) -
>> >>> [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in
>> sharing
>> >>> group < SlotSharingGroup [73191c171abfff61fb5102c161274145,
>> >>> 19596f7834805c8409c419f0edab1f1b] >. Resources available to
>> scheduler:
>> >>> Number of instances=0, total number of slots=0, available slots=0"
>> >>> Eventually, Flink stops for some reason (with another SIGTERM
>> message),
>> >>> presumably because of YARN
>> >>>
>> >>> Does anyone have an idea why a bad job repeatedly failing would
>> >>> eventually result in the Flink cluster dying?
>> >>>
>> >>> Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots
>> >>> available to run the job"? The JVM heap usage and the free memory on
>> the
>> >>> machines both look reasonable in my monitoring dashboards. Could it
>> possibly
>> >>> be a memory leak due to classloading or something?
>> >>>
>> >>> Thanks for any help or suggestions you can provide! I am hoping that
>> the
>> >>> "failure-rate" restart strategy will help avoid this issue in the
>> future,
>> >>> but I'd also like to understand what's making the cluster die so that
>> I can
>> >>> prevent it.
>> >>>
>> >>> -Shannon
>> >>
>> >>
>> >
>>
>
>

Reply via email to