[ 
https://issues.apache.org/jira/browse/FLINK-31183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-31183.
-----------------------------------
    Resolution: Fixed

> Flink Kinesis EFO Consumer can fail to stop gracefully
> ------------------------------------------------------
>
>                 Key: FLINK-31183
>                 URL: https://issues.apache.org/jira/browse/FLINK-31183
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.15.3, 1.16.1, aws-connector-4.0.0
>            Reporter: Danny Cranmer
>            Assignee: Danny Cranmer
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.4, aws-connector-4.1.0, 1.16.2
>
>
> *Background*
> When stopping a Flink job using the stop-with-savepoint API the EFO Kinesis 
> source can fail to close gracefully.
>  
> Sample stack trace
> {code:java}
> 2023-02-16 20:45:40
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
>       at 
> org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013)
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name 
> with subtask : Source: vas_source_stream (38/48)#0 Failure reason: Task has 
> failed.
>       at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1395)
>       at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1338)
>       at 
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>       at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>       at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> Caused by: java.util.concurrent.CompletionException: 
> java.util.concurrent.RejectedExecutionException: event executor terminated
>       at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>       at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
>       ... 3 more
> Caused by: java.util.concurrent.RejectedExecutionException: event executor 
> terminated
>       at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
>       at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
>       at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
>       at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
>       at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher$ChannelSubscription.cancel(HandlerPublisher.java:502)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.cancel(DelegatingSubscription.java:37)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription.cancel(Http2ResetSendingSubscription.java:41)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.cancel(DelegatingSubscription.java:37)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$OnCancelSubscription.cancel(ResponseHandler.java:409)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.FlatteningSubscriber$1.cancel(FlatteningSubscriber.java:98)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.FlatteningSubscriber.handleStateUpdate(FlatteningSubscriber.java:170)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.FlatteningSubscriber.access$100(FlatteningSubscriber.java:29)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.FlatteningSubscriber$1.request(FlatteningSubscriber.java:93)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.request(DelegatingSubscription.java:32)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.request(DelegatingSubscription.java:32)
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.request(DelegatingSubscription.java:32)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$FanOutShardSubscription.requestRecord(FanOutShardSubscriber.java:401)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:355)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:189)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:169)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:124)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:829)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to