Re: RocksDB StateBuilder unexpected exception

2021-03-23 Thread dhanesh arole
Hi Matthias,

Thanks for taking to help us with this.

You are right there were lots of task cancellations, as this exception
causes the job to get restarted, triggering cancellations.


-
Dhanesh Arole


On Tue, Mar 23, 2021 at 9:27 AM Matthias Pohl 
wrote:

> Hi Danesh,
> thanks for reaching out to the Flink community. Checking the code, it
> looks like the OutputStream is added to a CloseableRegistry before writing
> to it [1].
>
> My suspicion is - based on the exception cause - that the
> CloseableRegistry got triggered while restoring the state. I tried to track
> down the source of the CloseableRegistry. It looks like it's handed down
> from the StreamTask [2].
>
> The StreamTask closes the CloseableRegistry either when cancelling is
> triggered or in the class' finalize method. Have you checked the logs to
> see whether there was some task cancellation logged?
>
> Best,
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132
> [2]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269
>
> On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole 
> wrote:
>
>> Hello Hivemind,
>>
>> We are running a stateful streaming job. Each task manager instance hosts
>> around ~100GB of data. During restart of task managers we encountered
>> following errors, because of which the job is not able to restart.
>> Initially we thought it might be due to failing status checks of attached
>> EBS volumes or burst balance exhaustion but AWS console is not indicating
>> any issue with EBS volumes. Is there anything that else that we need to
>> look at which can potentially cause this exception? Also it's quite unclear
>> what exactly is the cause of the exception, any help on that would be much
>> appreciated.
>>
>> Flink version: 1.12.2_scala_2.11
>> Environment: Kubernetes on AWS
>> Volume Type: EBS, gp2 300GiB
>>
>> *ERROR
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
>> - Caught unexpected exception.*
>> *java.nio.channels.ClosedChannelException: null*
>> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>> ~[?:?]*
>> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
>> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
>> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
>> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ~[?:?]*
>> * at java.lang.Thread.run(Thread.java:830) [?:?]*
>> *2021-03-19 15:26:10,385 WARN
>>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
>> Exception while restoring keyed state backend for
>> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
>> alternative (1/1), will retry while more alternatives are available.*
>> *org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected exception.*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * 

Re: RocksDB StateBuilder unexpected exception

2021-03-23 Thread Matthias Pohl
Hi Danesh,
thanks for reaching out to the Flink community. Checking the code, it looks
like the OutputStream is added to a CloseableRegistry before writing to it
[1].

My suspicion is - based on the exception cause - that the CloseableRegistry
got triggered while restoring the state. I tried to track down the source
of the CloseableRegistry. It looks like it's handed down from the
StreamTask [2].

The StreamTask closes the CloseableRegistry either when cancelling is
triggered or in the class' finalize method. Have you checked the logs to
see whether there was some task cancellation logged?

Best,
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132
[2]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269

On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole 
wrote:

> Hello Hivemind,
>
> We are running a stateful streaming job. Each task manager instance hosts
> around ~100GB of data. During restart of task managers we encountered
> following errors, because of which the job is not able to restart.
> Initially we thought it might be due to failing status checks of attached
> EBS volumes or burst balance exhaustion but AWS console is not indicating
> any issue with EBS volumes. Is there anything that else that we need to
> look at which can potentially cause this exception? Also it's quite unclear
> what exactly is the cause of the exception, any help on that would be much
> appreciated.
>
> Flink version: 1.12.2_scala_2.11
> Environment: Kubernetes on AWS
> Volume Type: EBS, gp2 300GiB
>
> *ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
> - Caught unexpected exception.*
> *java.nio.channels.ClosedChannelException: null*
> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> ~[?:?]*
> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]*
> * at java.lang.Thread.run(Thread.java:830) [?:?]*
> *2021-03-19 15:26:10,385 WARN
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Exception while restoring keyed state backend for
> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
> alternative (1/1), will retry while more alternatives are available.*
> *org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
>