Thanks for checking this workaround! I've created a jira issue [1] to check if AWS SDK version can be upgraded in Flink distribution.
Regards, Roman On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse <d...@netzooid.com> wrote: > Well, I just dropped in the latest Amazon 1.11.878 SDK and now it > appears to respect interrupts in a test case I created. (the test fails > with the SDK that is in use by Flink) > > I will try it in a full fledged Flink environment and report back. > > On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse <d...@netzooid.com> wrote: > >> Did some digging... definitely appears that the Amazon SDK definitely is >> not picking up the interrupt. I will try playing with the connection >> timeout. Hadoop defaults it to 200000 ms, which may be part of the problem. >> Anyone have any other ideas? >> >> In theory this should be fixed by SDK v2 which uses NIO, but I don't >> think I'm up for all the changes that would involve in the downstream >> components. >> >> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <d...@netzooid.com> wrote: >> >>> Using the latest - 1.11.2. >>> >>> I would assume the interruption is being ignored in the Hadoop / S3 >>> layer. I was looking at the defaults and (if I understood correctly) the >>> client will retry 20 times. Which would explain why it never gets >>> cancelled... >>> >>> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman < >>> khachatryan.ro...@gmail.com> wrote: >>> >>>> Hi Dan Diephouse, >>>> >>>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, >>>> where 2 is a bug. >>>> It's unclear though where the interruption is ignored (Flink/Hadoop >>>> FS/S3 client). >>>> >>>> What version of Flink are you using? >>>> >>>> Regards, >>>> Roman >>>> >>>> >>>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <d...@netzooid.com> wrote: >>>> >>>>> I am now using the S3 StreamingFileSink to send data to an S3 bucket. >>>>> If/when the network connection has issues, it seems to put Flink into an >>>>> irrecoverable state. Am I understanding this correctly? Any suggestions on >>>>> how to troubleshoot / fix? >>>>> >>>>> Here is what I'm observing: >>>>> >>>>> *1. Network is dropped * >>>>> >>>>> *2. S3 connections do not exit gracefully* >>>>> >>>>> 2020-10-07 20:58:07.468 WARN 1 --- [aa565930b86fb).] >>>>> o.apache.flink.runtime.taskmanager.Task : Task 'Sink: Unnamed (1/12)' did >>>>> not react to cancelling signal for 30 seconds, but is stuck in method: >>>>> java.base@14.0.2/sun.nio.ch.Net.poll(Native Method) >>>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181) >>>>> java.base@14.0.2 >>>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285) >>>>> java.base@14.0.2 >>>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309) >>>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350) >>>>> java.base@14.0.2 >>>>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803) >>>>> java.base@14.0.2 >>>>> /java.net.Socket$SocketInputStream.read(Socket.java:982) >>>>> java.base@14.0.2 >>>>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469) >>>>> java.base@14.0.2 >>>>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463) >>>>> java.base@14.0.2 >>>>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160) >>>>> java.base@14.0.2 >>>>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110) >>>>> java.base@14.0.2 >>>>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475) >>>>> java.base@14.0.2 >>>>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381) >>>>> java.base@14.0.2 >>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441) >>>>> java.base@14.0.2 >>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412) >>>>> >>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436) >>>>> >>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384) >>>>> >>>>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142) >>>>> >>>>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) >>>>> >>>>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374) >>>>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) >>>>> java.base@14.0.2 >>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564) >>>>> >>>>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) >>>>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source) >>>>> >>>>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) >>>>> >>>>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) >>>>> >>>>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) >>>>> >>>>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) >>>>> >>>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) >>>>> >>>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) >>>>> >>>>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) >>>>> >>>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) >>>>> >>>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054) >>>>> >>>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000) >>>>> >>>>> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574) >>>>> >>>>> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597) >>>>> >>>>> *3. Tasks do not complete* >>>>> >>>>> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).] >>>>> o.a.f.runtime.taskexecutor.TaskExecutor : Task did not exit gracefully >>>>> within 180 + seconds. >>>>> >>>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit >>>>> gracefully within 180 + seconds. >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na] >>>>> >>>>> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).] >>>>> o.a.f.runtime.minicluster.MiniCluster : TaskManager #0 failed. >>>>> >>>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit >>>>> gracefully within 180 + seconds. >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na] >>>>> >>>>> *4. When trying to restart, there are no slots* >>>>> >>>>> 2020-10-07 21:00:37.486 INFO 1 --- [t-dispatcher-46] >>>>> o.a.f.r.executiongraph.ExecutionGraph : Compute vehicle location from >>>>> tag reads -> Sink: Vehicle Event Sink (2/12) >>>>> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on >>>>> not >>>>> deployed. >>>>> >>>>> java.util.concurrent.CompletionException: >>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >>>>> No pooled slot available and request to ResourceManager for new slot >>>>> failed >>>>> at >>>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) >>>>> ~[na:na] >>>>> at >>>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) >>>>> ~[na:na] >>>>> at >>>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) >>>>> ~[na:na] >>>>> at >>>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) >>>>> ~[na:na] >>>>> at >>>>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152) >>>>> ~[na:na] >>>>> at >>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) >>>>> ~[na:na] >>>>> at >>>>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) >>>>> ~[na:na] >>>>> at >>>>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) >>>>> ~[na:na] >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>> ~[scala-library-2.11.12.jar:na] >>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>>> ~[scala-library-2.11.12.jar:na] >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>> ~[scala-library-2.11.12.jar:na] >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>> ~[scala-library-2.11.12.jar:na] >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21] >>>>> Caused by: >>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >>>>> No pooled slot available and request to ResourceManager for new slot >>>>> failed >>>>> ... 27 common frames omitted >>>>> Caused by: >>>>> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: >>>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. >>>>> at >>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>> Method) ~[na:na] >>>>> at >>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> ~[na:na] >>>>> at >>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> ~[na:na] >>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na] >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> ... 20 common frames omitted >>>>> Caused by: >>>>> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: >>>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested >>>>> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable. >>>>> at >>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) >>>>> ~[flink-core-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52) >>>>> ~[flink-core-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> at >>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386) >>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2] >>>>> ... 27 common frames omitted >>>>> >>>>> Any thoughts / suggestions are much appreciated. >>>>> >>>>> -- >>>>> Dan Diephouse >>>>> @dandiep >>>>> >>>> >>> >>> -- >>> Dan Diephouse >>> @dandiep >>> >> >> >> -- >> Dan Diephouse >> @dandiep >> > > > -- > Dan Diephouse > @dandiep >