[ 
https://issues.apache.org/jira/browse/FLINK-24182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise updated FLINK-24182:
--------------------------------
    Comment: was deleted

(was: I'm not entirely sure if it's directly connected, but when spinning the 
KinesisITCase a few hundred times I got the following error on 
stop-with-savepoint with drain. It killed the VM.

{noformat}
428798 [flink-akka.actor.default-dispatcher-8] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering 
stop-with-savepoint for job 899072e7d760a9e9c4f9db7c878748cc.
429849 [jobmanager-io-thread-9] WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Received late 
message for now expired checkpoint attempt 3 from task 
e90d11e5ffafb05b16aab25086a8c8ea of job 899072e7d760a9e9c4f9db7c878748cc at 
9e78fccd-6260-442b-afa4-c9ab3ba447ea @ localhost (dataPort=-1).
433017 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 4 (type=SAVEPOINT_TERMINATE) @ 1631039319955 for job 
899072e7d760a9e9c4f9db7c878748cc.
433018 [Source: Custom Source -> Map -> Sink: Data stream collect sink (1/1)#0] 
INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...
433018 [Source: Custom Source -> Map -> Sink: Data stream collect sink (1/1)#0] 
INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...
433018 [shardConsumers-Source: Custom Source -> Map -> Sink: Data stream 
collect sink (1/1)#0-thread-0] INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...
java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_222]
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216)
 ~[classes/:?]
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124)
 ~[classes/:?]
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
 ~[classes/:?]
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
 [classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_222]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
[?:1.8.0_222]
        at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_222]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_222]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_222]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
433018 [shardConsumers-Source: Custom Source -> Map -> Sink: Data stream 
collect sink (1/1)#0-thread-0] INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...
433019 [Legacy Source Thread - Source: Custom Source -> Map -> Sink: Data 
stream collect sink (1/1)#0] ERROR 
org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'Legacy 
Source Thread - Source: Custom Source -> Map -> Sink: Data stream collect sink 
(1/1)#0' produced an uncaught exception. Stopping the process...
java.util.concurrent.CompletionException: 
org.apache.flink.util.WrappingRuntimeException: 
java.util.concurrent.CompletionException: java.lang.InterruptedException: sleep 
interrupted
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 ~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 ~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:874)
 ~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
 ~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 [?:1.8.0_222]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
 [classes/:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: 
java.util.concurrent.CompletionException: java.lang.InterruptedException: sleep 
interrupted
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$assertTriggeringCheckpointExceptions$13(StreamTask.java:1218)
 ~[classes/:?]
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
 ~[?:1.8.0_222]
        ... 4 more
Caused by: java.util.concurrent.CompletionException: 
java.lang.InterruptedException: sleep interrupted
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_222]
        ... 3 more
Caused by: java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_222]
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216)
 ~[classes/:?]
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124)
 ~[classes/:?]
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
 ~[classes/:?]
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_222]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
~[?:1.8.0_222]
        at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_222]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_222]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_222]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
{noformat}

Note that I previously complained about the interruption in the context of 
Kafka. Now it seems like Kinesis also suffers from it.)

> Tasks canceler should not immediately interrupt
> -----------------------------------------------
>
>                 Key: FLINK-24182
>                 URL: https://issues.apache.org/jira/browse/FLINK-24182
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Arvid Heise
>            Priority: Major
>
> While debugging resource leaks (FLINK-24131), I found that any connector is 
> immediately interrupted on cancel. Hence, any attempts of using blocking 
> calls in {{close}} to cleanup resources are immediately unreliable (e.g. 
> aborting transactions).
> It would be nice if tasks get a grace period (e.g. 
> task.cancellation.interval) where they can try to free resources in a proper, 
> potentially blocking fashion before being interrupted.
> Nevertheless, connectors should always expect interruptions during shutdown, 
> in particular when the user-configurable grace period is depleted. I'd add 
> that to the connector documentation in a separate effort.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to