Hi Matthias,

Thanks for taking to help us with this.

You are right there were lots of task cancellations, as this exception
causes the job to get restarted, triggering cancellations.


-
Dhanesh Arole


On Tue, Mar 23, 2021 at 9:27 AM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Danesh,
> thanks for reaching out to the Flink community. Checking the code, it
> looks like the OutputStream is added to a CloseableRegistry before writing
> to it [1].
>
> My suspicion is - based on the exception cause - that the
> CloseableRegistry got triggered while restoring the state. I tried to track
> down the source of the CloseableRegistry. It looks like it's handed down
> from the StreamTask [2].
>
> The StreamTask closes the CloseableRegistry either when cancelling is
> triggered or in the class' finalize method. Have you checked the logs to
> see whether there was some task cancellation logged?
>
> Best,
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132
> [2]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269
>
> On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole <davcdhane...@gmail.com>
> wrote:
>
>> Hello Hivemind,
>>
>> We are running a stateful streaming job. Each task manager instance hosts
>> around ~100GB of data. During restart of task managers we encountered
>> following errors, because of which the job is not able to restart.
>> Initially we thought it might be due to failing status checks of attached
>> EBS volumes or burst balance exhaustion but AWS console is not indicating
>> any issue with EBS volumes. Is there anything that else that we need to
>> look at which can potentially cause this exception? Also it's quite unclear
>> what exactly is the cause of the exception, any help on that would be much
>> appreciated.
>>
>> Flink version: 1.12.2_scala_2.11
>> Environment: Kubernetes on AWS
>> Volume Type: EBS, gp2 300GiB
>>
>> *ERROR
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
>> - Caught unexpected exception.*
>> *java.nio.channels.ClosedChannelException: null*
>> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>> ~[?:?]*
>> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
>> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
>> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
>> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ~[?:?]*
>> * at java.lang.Thread.run(Thread.java:830) [?:?]*
>> *2021-03-19 15:26:10,385 WARN
>>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
>> Exception while restoring keyed state backend for
>> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
>> alternative (1/1), will retry while more alternatives are available.*
>> *org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected exception.*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> [flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
>> [flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>> [flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> [flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> [flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at java.lang.Thread.run(Thread.java:830) [?:?]*
>> *Caused by: java.nio.channels.ClosedChannelException*
>> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>> ~[?:?]*
>> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
>> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
>> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
>> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ~[?:?]*
>>
>>
>> -
>> Dhanesh Arole
>>
>>

Reply via email to