Hi David,
We made the changes, now submitting the jobs using flink CLI.
To be more specific
Unfortunately, we are still seeing the same error.
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
The behavior is the following:
One task manager crashes, from that point submitting new jobs fail with the
following error:
Caused by: java.io.IOException: Could not connect to BlobServer at address
Then we saw the native thread error on another task manager.
The cluster is up without running jobs until we restart some task / job
managers.
Our blob related configuration:
* blob.server.port: 6124
* blob.fetch.num-concurrent: 300
* blob.fetch.retries: 20
* blob.service.cleanup.interval: 10800
full stack trace of the submitting error:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute job 'job_name'.\n", b'\tat
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n',
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n',
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n',
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n',
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n',
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n',
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n',
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n',
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n',
b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'job_name'.\n",
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n',
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n',
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n',
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n',
com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n',
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n',
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n',
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n',
java.lang.reflect.Method.invoke(Method.java:498)\n',
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n',
\t... 8 more\n',
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.\n',
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n',
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n',
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n',
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n',
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n',
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n',
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n',
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n',
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n',
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n',
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
java.lang.Thread.run(Thread.java:748)\n',
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload
job files.\n',
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n',
java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n',
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n',
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n',
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n',
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n',
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n',
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n',
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
java.lang.Thread.run(Thread.java:748)\n',
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.\n',
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n',
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n',
\t... 11 more\n',
Caused by: java.io.IOException: Could not connect to BlobServer at address
DOMAIN/IP:PORT\n',
org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n',
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n',
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n',
\t... 12 more\n',
Caused by: java.net.ConnectException: Connection refused (Connection
refused)\n',
java.net.PlainSocketImpl.socketConnect(Native Method)\n',
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n',
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n',
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n',
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n',
java.net.Socket.connect(Socket.java:607)\n',
org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n',
\t... 14 more\n',
]\n',
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n',
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n',
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n',
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n',
\t... 4 more\n']
[[email protected]]<https://www.start.io/>
Ilan Huchansky ● Big data developer
M 972-54-5200110
Explore our audiences<https://www.start.io/audience/>
From: Ilan Huchansky <[email protected]>
Date: Tuesday, 7 December 2021 at 11:22
To: David Morávek <[email protected]>
Cc: [email protected] <[email protected]>, Start.io SDP <[email protected]>
Subject: Re: Unable to create new native thread error
Hi David,
In that case, I will start working on using the CLI instead of the REST API
right away.
Will update you when I finish.
Thanks for the help,
Ilan.
From: David Morávek <[email protected]>
Date: Monday, 6 December 2021 at 10:34
To: Ilan Huchansky <[email protected]>
Cc: [email protected] <[email protected]>, Start.io SDP <[email protected]>
Subject: Re: Unable to create new native thread error
Hi Ilan,
I think so, using CLI instead of REST API should solve this, as the user code
execution would be pulled out to a separate JVM. If you're going to try that,
it would be great to hear back whether it has solved your issue.
As for 1.13.4, there is currently no on-going effort / concrete plan on the
release.
Best,
D.
On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky
<[email protected]<mailto:[email protected]>> wrote:
Hi David,
Thanks for your fast response.
Do you think that changing the submission method could solve the problem? Using
the CLI instead of the REST API.
Another question, I see that the most critical issue (FLINK-25022) is in
progress and should be released on with version 1.13.4 , do you know when this
version is planned to be released?
Thanks again,
Ilan.
From: David Morávek <[email protected]<mailto:[email protected]>>
Date: Thursday, 2 December 2021 at 17:25
To: Ilan Huchansky <[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>, Start.io SDP
<[email protected]<mailto:[email protected]>>
Subject: Re: Unable to create new native thread error
Hi Ilan,
we are aware of multiple issues when web-submission can result in classloader /
thread local leaks, which could potentially result in the behavior you're
describing. We're working on addressing them.
FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a lot
of small batch jobs) and could be fixed by accounting for when setting
Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via rest
API. (constant overhead for Metaspace)
In general, web-submission is different from a normal submission in way, that
the "main method" of the uploaded jar is executed on JobManager and it's really
hard to isolate it's execution from possible side effects.
[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023
Best,
D.
On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky
<[email protected]<mailto:[email protected]>> wrote:
Hi Flink mailing list,
I am Ilan from Start.io data platform team, need some guidance.
We have a flow with the following use case:
* We read files from AWS S3 buckets process them on our cluster and sink
the data into files using Flink file sink.
* The jobs use always the same jar, we uploaded it to every job manager on
the cluster.
* We are submitting jobs constantly through the REST API.
* Each job reads one or more files from S3.
* The jobs can run from 20 seconds up to 3.5 hours.
* The jobs run on batch mode
* Running flink 1.13.1
* We are running in cluster mode using docker, same machines are being used
for task and job manager.
We are struggling with the same error, over and over again. We encounter it in
the job manager and in the task manager.
After a while that the cluster is running and jobs are finishing correctly the
task and job manager fail to operate due to:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure
it is related.
Our set up and configuration are as follow:
* 5 nodes cluster running on docker
* Relevant memory config:
jobmanager.memory.heap.size: 1600m
taskmanager.memory.process.size: 231664m
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.jvm-metaspace.size: 10g
jobmanager.memory.jvm-metaspace.size: 2g
taskmanager.memory.framework.off-heap.size: 1g
* Host details
max locked memory (kbytes, -l) 65536
max memory size (kbytes, -m) unlimited
open files (-n) 1024
max user processes (-u) 1547269
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
cat /proc/sys/kernel/threads-max: 3094538
kernel.pid_max = 57344
We try to increase the max user processes, also to increase and decrease the
jvm-metaspace.
Should we keep increasing the max number of processes on the host, Is there a
way to limit the number of threads from flink config?
What should we do? Any insights?
I can provide more information as needed.
Thanks in advance
Ilan