Using the latest - 1.11.2.

I would assume the interruption is being ignored in the Hadoop / S3 layer.
I was looking at the defaults and (if I understood correctly) the client
will retry 20 times. Which would explain why it never gets cancelled...

On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Dan Diephouse,
>
> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
> where 2 is a bug.
> It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
> client).
>
> What version of Flink are you using?
>
> Regards,
> Roman
>
>
> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <d...@netzooid.com> wrote:
>
>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>> If/when the network connection has issues, it seems to put Flink into an
>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>> how to troubleshoot / fix?
>>
>> Here is what I'm observing:
>>
>> *1. Network is dropped *
>>
>> *2. S3 connections do not exit gracefully*
>>
>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>> java.base@14.0.2
>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>
>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>
>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>
>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>
>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>
>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>> java.base@14.0.2
>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>
>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>
>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>
>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>
>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>
>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>
>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>
>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>
>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>>
>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>>
>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>>
>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>>
>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>>
>> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>>
>> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>>
>> *3. Tasks do not complete*
>>
>> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
>> o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
>> within 180 + seconds.
>>
>> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
>> within 180 + seconds.
>> at
>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>
>> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
>> o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.
>>
>> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
>> within 180 + seconds.
>> at
>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>
>> *4. When trying to restart, there are no slots*
>>
>> 2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
>> o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
>> tag reads -> Sink: Vehicle Event Sink (2/12)
>> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
>> deployed.
>>
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> No pooled slot available and request to ResourceManager for new slot failed
>> at
>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
>> ~[na:na]
>> at
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>> ~[na:na]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> ~[scala-library-2.11.12.jar:na]
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> ~[scala-library-2.11.12.jar:na]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[scala-library-2.11.12.jar:na]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[scala-library-2.11.12.jar:na]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> Caused by:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> No pooled slot available and request to ResourceManager for new slot failed
>> ... 27 common frames omitted
>> Caused by:
>> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method) ~[na:na]
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[na:na]
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[na:na]
>> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> ... 20 common frames omitted
>> Caused by:
>> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
>> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>> ~[flink-core-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>> ~[flink-core-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> ... 27 common frames omitted
>>
>> Any thoughts / suggestions are much appreciated.
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 
Dan Diephouse
@dandiep

Reply via email to