Hi Danesh,
thanks for reaching out to the Flink community. Checking the code, it looks
like the OutputStream is added to a CloseableRegistry before writing to it
[1].

My suspicion is - based on the exception cause - that the CloseableRegistry
got triggered while restoring the state. I tried to track down the source
of the CloseableRegistry. It looks like it's handed down from the
StreamTask [2].

The StreamTask closes the CloseableRegistry either when cancelling is
triggered or in the class' finalize method. Have you checked the logs to
see whether there was some task cancellation logged?

Best,
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132
[2]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269

On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole <davcdhane...@gmail.com>
wrote:

> Hello Hivemind,
>
> We are running a stateful streaming job. Each task manager instance hosts
> around ~100GB of data. During restart of task managers we encountered
> following errors, because of which the job is not able to restart.
> Initially we thought it might be due to failing status checks of attached
> EBS volumes or burst balance exhaustion but AWS console is not indicating
> any issue with EBS volumes. Is there anything that else that we need to
> look at which can potentially cause this exception? Also it's quite unclear
> what exactly is the cause of the exception, any help on that would be much
> appreciated.
>
> Flink version: 1.12.2_scala_2.11
> Environment: Kubernetes on AWS
> Volume Type: EBS, gp2 300GiB
>
> *ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
> - Caught unexpected exception.*
> *java.nio.channels.ClosedChannelException: null*
> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> ~[?:?]*
> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]*
> * at java.lang.Thread.run(Thread.java:830) [?:?]*
> *2021-03-19 15:26:10,385 WARN
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Exception while restoring keyed state backend for
> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
> alternative (1/1), will retry while more alternatives are available.*
> *org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> [flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
> [flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> [flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> [flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> [flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at java.lang.Thread.run(Thread.java:830) [?:?]*
> *Caused by: java.nio.channels.ClosedChannelException*
> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> ~[?:?]*
> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]*
>
>
> -
> Dhanesh Arole
>
>

Reply via email to