Job stuck in CREATED state with scheduling failures

2023-01-21 Thread Gyula Fóra
Hi Devs!

We noticed a very strange failure scenario a few times recently with the
Native Kubernetes integration.

The issue is triggered by a heartbeat timeout (a temporary network
problem). We observe the following behaviour:

===
3 pods (1 JM, 2 TMs), Flink 1.15 (Kubernetes Native Integration):

1. Temporary network problem
 - Heartbeat failure, TM1 loses JM connection and JM loses TM1 connection.
 - Both the JM and TM1 trigger the job failure on their sides and cancel
the tasks
 - JM releases TM1 slots

2. While failing/cancelling the job, the network connection recovers and
TM1 reconnects to JM:
*TM1: Resolved JobManager address, beginning registration*

3. JM tries to resubmit the job using TM1 + TM2 but the scheduler keeps
failing as it cannot seem to allocate all the resources:

*NoResourceAvailableException: Slot request bulk is not fulfillable! Could
not allocate the required slot within slot request timeout*
On TM1 we see the following logs repeating (mutliple times every few
seconds until the slot request times out after 5 minutes):
*Receive slot request ... for job ... from resource manager with leader id
...*
*Allocated slot for ...*
*Receive slot request ... for job ... from resource manager with leader id
...*
*Allocated slot for *
*Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
ResourceProfile{...}, allocationId: ..., jobId: ...).*

While all these are happening on TM1 we don't see any allocation related
INFO logs on TM2.
===

Seems like something weird happens when TM1 reconnects after the heartbeat
loss. I feel that the JM should probably shut down the TM and create a new
one. But instead it gets stuck.

Any ideas what could be happening here?

Thanks
Gyula


Re: Kubernetes JobManager and TaskManager minimum/maximum resources

2023-01-21 Thread Gyula Fóra
But of course the actual memory requirement will largely depend on the type
of job, statebackend , number of task slots etc

Production TM/JMs usually have much more resources allocated than 2gb/1cpu
as you never want to run out of it :)

Gyula

On Sat, 21 Jan 2023 at 11:17, Gyula Fóra  wrote:

> Hi!
>
> I think the examples allocate too many resources by default and we should
> reduce it in the yamls.
>
> 1gb memory and 0.5 cpu should be more than enough , we could probably get
> away with even less for example purposes.
>
> Would you have time trying this out and maybe contributing this
> improvement? :)
>
> Thanks
> Gyula
>
>
> On Fri, 20 Jan 2023 at 05:32, Lee Parayno  wrote:
>
>> For application mode FlinkDeployments (maybe even session mode) in
>> Kubernetes from the Flink Kubernetes Operator what is the absolute minimum
>> amount of CPU and RAM that is required to run the JobManager and
>> TaskManager processes?
>>
>> Some of the example deployment yaml examples have CPU set at 1 full vCPU
>> and memory at 2GB (2048 MB).  If you factor in JobManager HA, and 1 or more
>> TaskManagers (not sure what is the bounding limit for these processes), you
>> can be at 3 vCPU and 6 GB memory used just by the “Flink Infrastructure”
>> not counting the Job pods.
>>
>> Has anyone seen a need to have more resources dedicated to these
>> processes for some reason?  Has anyone run it leaner than this (like with
>> 0.5 vCPU and less than 1GB memory) in production?
>>
>> Comparing this to Google Cloud Platform and the Dataflow Runner, AFAIK
>> the only resources utilized (that customers pay for) are the Job instances.
>>
>> Lee Parayno
>> Sent from my iPhone
>
>


Re: Kubernetes JobManager and TaskManager minimum/maximum resources

2023-01-21 Thread Gyula Fóra
Hi!

I think the examples allocate too many resources by default and we should
reduce it in the yamls.

1gb memory and 0.5 cpu should be more than enough , we could probably get
away with even less for example purposes.

Would you have time trying this out and maybe contributing this
improvement? :)

Thanks
Gyula


On Fri, 20 Jan 2023 at 05:32, Lee Parayno  wrote:

> For application mode FlinkDeployments (maybe even session mode) in
> Kubernetes from the Flink Kubernetes Operator what is the absolute minimum
> amount of CPU and RAM that is required to run the JobManager and
> TaskManager processes?
>
> Some of the example deployment yaml examples have CPU set at 1 full vCPU
> and memory at 2GB (2048 MB).  If you factor in JobManager HA, and 1 or more
> TaskManagers (not sure what is the bounding limit for these processes), you
> can be at 3 vCPU and 6 GB memory used just by the “Flink Infrastructure”
> not counting the Job pods.
>
> Has anyone seen a need to have more resources dedicated to these processes
> for some reason?  Has anyone run it leaner than this (like with 0.5 vCPU
> and less than 1GB memory) in production?
>
> Comparing this to Google Cloud Platform and the Dataflow Runner, AFAIK the
> only resources utilized (that customers pay for) are the Job instances.
>
> Lee Parayno
> Sent from my iPhone


Re: DuplicateJobSubmissionException on restart after taskmanagers crash

2023-01-21 Thread Gyula Fóra
Hi Javier,

I will try to look into this as I have not personally seen this problem
while using the operator .

It would be great if you could reach out to me on slack or email directly
so we can discuss the issue and get to the bottom of it.

Cheer
Gyula

On Fri, 20 Jan 2023 at 23:53, Javier Vegas  wrote:

> My issue is described in https://issues.apache.org/jira/browse/FLINK-21928
> where it says was fixed in 1.14, but I am still seeing the problem.
> Although there it says:
>
> "Additionally, it is still required that the user cleans up the
> corresponding HA entries for the running jobs registry because these
> entries won't be reliably cleaned up when encountering the situation
> described by FLINK-21928
> ."
>
> so I guess I need to do some manual cleanup of my S3 HA data before
> restarting
>
> El vie, 20 ene 2023 a las 4:58, Javier Vegas ()
> escribió:
>
>>
>> I have a Flink app (Flink 1.16.0, deployed to Kubernetes via operator
>> 1.3.1 and using Kubernetes HighAvailaibilty with storage in S3) that
>> depends on multiple Thrift services for data queries. When one of those
>> services is down (or throws exceptions) the Flink job managers end up
>> crashing and only the task managers remain up. Once the dependencies are
>> fixed, when I try to restart the Flink app I end up with a
>> "DuplicateJobSubmissionException: Job has already been submitted" (see
>> below for detailed log) and the task managers never start. The only
>> solution I have found is to delete the deployment from Kubernetes and then
>> deploy again as a new job.
>>
>> 1) Is there a better way to handle failures on dependencies than letting
>> task managers crash and keep job managers up, and restart after
>> dependencies are fixed?
>> 1) If not, is there a way to handle the DuplicateJobSubmissionException
>> so the Flink app can be restarted without having to uninstall it first?
>>
>> Thanks,
>>
>> Javier Vegas
>>
>>
>> org.apache.flink.util.FlinkException: Failed to execute job
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
>> Caused by:
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>> already been submitted.
>> at
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:449)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source)
>> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>> at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>> ... 5 more
>> Exception thrown in main on startup
>>
>>
>>
>