[
https://issues.apache.org/jira/browse/FLINK-24182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411461#comment-17411461
]
Arvid Heise edited comment on FLINK-24182 at 9/7/21, 8:18 PM:
--------------------------------------------------------------
I'm not entirely sure if it's directly connected, but when spinning the
KinesisITCase a few hundred times I got the following error on
stop-with-savepoint with drain. It killed the VM.
{noformat}
428798 [flink-akka.actor.default-dispatcher-8] INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering
stop-with-savepoint for job 899072e7d760a9e9c4f9db7c878748cc.
429849 [jobmanager-io-thread-9] WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Received late
message for now expired checkpoint attempt 3 from task
e90d11e5ffafb05b16aab25086a8c8ea of job 899072e7d760a9e9c4f9db7c878748cc at
9e78fccd-6260-442b-afa4-c9ab3ba447ea @ localhost (dataPort=-1).
433017 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 4 (type=SAVEPOINT_TERMINATE) @ 1631039319955 for job
899072e7d760a9e9c4f9db7c878748cc.
433018 [Source: Custom Source -> Map -> Sink: Data stream collect sink (1/1)#0]
INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0
...
433018 [Source: Custom Source -> Map -> Sink: Data stream collect sink (1/1)#0]
INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Shutting down the shard consumer threads of subtask 0 ...
433018 [shardConsumers-Source: Custom Source -> Map -> Sink: Data stream
collect sink (1/1)#0-thread-0] INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0
...
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_222]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
[classes/:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_222]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
[?:1.8.0_222]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_222]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_222]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_222]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
433018 [shardConsumers-Source: Custom Source -> Map -> Sink: Data stream
collect sink (1/1)#0-thread-0] INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Shutting down the shard consumer threads of subtask 0 ...
433019 [Legacy Source Thread - Source: Custom Source -> Map -> Sink: Data
stream collect sink (1/1)#0] ERROR
org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'Legacy
Source Thread - Source: Custom Source -> Map -> Sink: Data stream collect sink
(1/1)#0' produced an uncaught exception. Stopping the process...
java.util.concurrent.CompletionException:
org.apache.flink.util.WrappingRuntimeException:
java.util.concurrent.CompletionException: java.lang.InterruptedException: sleep
interrupted
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:874)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
[?:1.8.0_222]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
[classes/:?]
Caused by: org.apache.flink.util.WrappingRuntimeException:
java.util.concurrent.CompletionException: java.lang.InterruptedException: sleep
interrupted
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$assertTriggeringCheckpointExceptions$13(StreamTask.java:1218)
~[classes/:?]
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
~[?:1.8.0_222]
... 4 more
Caused by: java.util.concurrent.CompletionException:
java.lang.InterruptedException: sleep interrupted
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
~[?:1.8.0_222]
... 3 more
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_222]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
~[classes/:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_222]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
~[?:1.8.0_222]
at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_222]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_222]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_222]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
{noformat}
was (Author: arvid):
I'm not entirely sure, but when spinning the KinesisITCase a few hundred times
I got the following error on stop-with-savepoint with drain. It killed the VM.
{noformat}
428798 [flink-akka.actor.default-dispatcher-8] INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering
stop-with-savepoint for job 899072e7d760a9e9c4f9db7c878748cc.
429849 [jobmanager-io-thread-9] WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Received late
message for now expired checkpoint attempt 3 from task
e90d11e5ffafb05b16aab25086a8c8ea of job 899072e7d760a9e9c4f9db7c878748cc at
9e78fccd-6260-442b-afa4-c9ab3ba447ea @ localhost (dataPort=-1).
433017 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 4 (type=SAVEPOINT_TERMINATE) @ 1631039319955 for job
899072e7d760a9e9c4f9db7c878748cc.
433018 [Source: Custom Source -> Map -> Sink: Data stream collect sink (1/1)#0]
INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0
...
433018 [Source: Custom Source -> Map -> Sink: Data stream collect sink (1/1)#0]
INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Shutting down the shard consumer threads of subtask 0 ...
433018 [shardConsumers-Source: Custom Source -> Map -> Sink: Data stream
collect sink (1/1)#0-thread-0] INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0
...
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_222]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
[classes/:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_222]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
[?:1.8.0_222]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_222]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_222]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_222]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
433018 [shardConsumers-Source: Custom Source -> Map -> Sink: Data stream
collect sink (1/1)#0-thread-0] INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Shutting down the shard consumer threads of subtask 0 ...
433019 [Legacy Source Thread - Source: Custom Source -> Map -> Sink: Data
stream collect sink (1/1)#0] ERROR
org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'Legacy
Source Thread - Source: Custom Source -> Map -> Sink: Data stream collect sink
(1/1)#0' produced an uncaught exception. Stopping the process...
java.util.concurrent.CompletionException:
org.apache.flink.util.WrappingRuntimeException:
java.util.concurrent.CompletionException: java.lang.InterruptedException: sleep
interrupted
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:874)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
[?:1.8.0_222]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
[classes/:?]
Caused by: org.apache.flink.util.WrappingRuntimeException:
java.util.concurrent.CompletionException: java.lang.InterruptedException: sleep
interrupted
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$assertTriggeringCheckpointExceptions$13(StreamTask.java:1218)
~[classes/:?]
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
~[?:1.8.0_222]
... 4 more
Caused by: java.util.concurrent.CompletionException:
java.lang.InterruptedException: sleep interrupted
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
~[?:1.8.0_222]
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
~[?:1.8.0_222]
... 3 more
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_222]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
~[classes/:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
~[classes/:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_222]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
~[?:1.8.0_222]
at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_222]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_222]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_222]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
{noformat}
> Tasks canceler should not immediately interrupt
> -----------------------------------------------
>
> Key: FLINK-24182
> URL: https://issues.apache.org/jira/browse/FLINK-24182
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: Arvid Heise
> Priority: Major
>
> While debugging resource leaks (FLINK-24131), I found that any connector is
> immediately interrupted on cancel. Hence, any attempts of using blocking
> calls in {{close}} to cleanup resources are immediately unreliable (e.g.
> aborting transactions).
> It would be nice if tasks get a grace period (e.g.
> task.cancellation.interval) where they can try to free resources in a proper,
> potentially blocking fashion before being interrupted.
> Nevertheless, connectors should always expect interruptions during shutdown,
> in particular when the user-configurable grace period is depleted. I'd add
> that to the connector documentation in a separate effort.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)