Did some digging... definitely appears that the Amazon SDK definitely is
not picking up the interrupt.  I will try playing with the connection
timeout. Hadoop defaults it to 200000 ms, which may be part of the problem.
Anyone have any other ideas?

In theory this should be fixed by SDK v2 which uses NIO, but I don't think
I'm up for all the changes that would involve in the downstream components.

On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <d...@netzooid.com> wrote:

> Using the latest - 1.11.2.
>
> I would assume the interruption is being ignored in the Hadoop / S3 layer.
> I was looking at the defaults and (if I understood correctly) the client
> will retry 20 times. Which would explain why it never gets cancelled...
>
> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Dan Diephouse,
>>
>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
>> where 2 is a bug.
>> It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
>> client).
>>
>> What version of Flink are you using?
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <d...@netzooid.com> wrote:
>>
>>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>>> If/when the network connection has issues, it seems to put Flink into an
>>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>>> how to troubleshoot / fix?
>>>
>>> Here is what I'm observing:
>>>
>>> *1. Network is dropped *
>>>
>>> *2. S3 connections do not exit gracefully*
>>>
>>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>>> java.base@14.0.2
>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>>> java.base@14.0.2
>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>>> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>>
>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>>
>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>>
>>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>>
>>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>>
>>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>>> java.base@14.0.2
>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>>
>>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>>
>>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>>
>>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>>
>>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>>
>>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>>
>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>>
>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>>
>>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>>>
>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>>>
>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>>>
>>> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>>>
>>> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>>>
>>> *3. Tasks do not complete*
>>>
>>> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
>>> o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
>>> within 180 + seconds.
>>>
>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>> gracefully within 180 + seconds.
>>> at
>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>
>>> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
>>> o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.
>>>
>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>> gracefully within 180 + seconds.
>>> at
>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>
>>> *4. When trying to restart, there are no slots*
>>>
>>> 2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
>>> o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
>>> tag reads -> Sink: Vehicle Event Sink (2/12)
>>> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
>>> deployed.
>>>
>>> java.util.concurrent.CompletionException:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> No pooled slot available and request to ResourceManager for new slot failed
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
>>> ~[na:na]
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>>> ~[na:na]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> ~[scala-library-2.11.12.jar:na]
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> ~[scala-library-2.11.12.jar:na]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> ~[scala-library-2.11.12.jar:na]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> ~[scala-library-2.11.12.jar:na]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> Caused by:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> No pooled slot available and request to ResourceManager for new slot failed
>>> ... 27 common frames omitted
>>> Caused by:
>>> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method) ~[na:na]
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[na:na]
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[na:na]
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> ... 20 common frames omitted
>>> Caused by:
>>> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
>>> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>> ~[flink-core-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>> ~[flink-core-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> ... 27 common frames omitted
>>>
>>> Any thoughts / suggestions are much appreciated.
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>
> --
> Dan Diephouse
> @dandiep
>


-- 
Dan Diephouse
@dandiep

Reply via email to