Hi Guowei, I thought Flink can support any HDFS-compatible object store like the majority of Big Data frameworks. So we just added "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" dependencies to the classpath, after that using "gs" prefix seems to be possible:
state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints state.savepoints.dir: gs://<REDACTED>/flink-savepoints And yes, I noticed that retries logging too, but I'm not sure if it's implemented on the Flink side or the GCS connector side? Probably need to dive deeper into the source code. And if it's implemented on the GCS connector side, will Flink wait for all the retries? That's why I asked about the potential timeout on the Flink side. The JM log doesn't have much besides from what I already posted. It's hard for me to share the whole log, but the RocksDB initialization part can be relevant: 16:03:41.987 [cluster-io-thread-3] INFO org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to configure application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152} 16:03:41.988 [cluster-io-thread-3] INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: FLASH_SSD_OPTIMIZED. 16:03:41.988 [cluster-io-thread-3] INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, state.backend.rocksdb.block.blocksize=16 kb, state.backend.rocksdb.block.cache-size=64 mb}}. 16:03:41.988 [cluster-io-thread-3] INFO org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152} Thanks! On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <guowei....@gmail.com> wrote: > Hi, Yaroslav > > AFAIK there is no official GCS FileSystem support in FLINK. Does the GCS > is implemented by yourself? > Would you like to share the whole log of jm? > > BTW: From the following log I think the implementation has already some > retry mechanism. > >>> Interrupted while sleeping before retry. Giving up after 1/10 retries > for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d > > Best, > Guowei > > > On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko < > yaroslav.tkache...@shopify.com> wrote: > >> Hi everyone, >> >> I'm wondering if people have experienced issues with Taskmanager failure >> recovery when dealing with a lot of state. >> >> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints >> and checkpoints. ~150 task managers with 4 slots each. >> >> When I run a pipeline without much state and kill one of the >> taskmanagers, it takes a few minutes to recover (I see a few restarts), but >> eventually when a new replacement taskmanager is registered with the >> jobmanager things go back to healthy. >> >> But when I run a pipeline with a lot of state (1TB+) and kill one of the >> taskmanagers, the pipeline never recovers, even after the replacement >> taskmanager has joined. It just enters an infinite loop of restarts and >> failures. >> >> On the jobmanager, I see an endless loop of state transitions: RUNNING >> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. >> It stays in RUNNING for a few seconds, but then transitions into FAILED >> with a message like this: >> >> >> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> >> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to >> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357). >> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >> readAddress(..) failed: Connection reset by peer (connection to ' >> 10.30.10.53/10.30.10.53:45789') >> at >> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >> ... >> Caused by: >> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: >> readAddress(..) failed: Connection reset by peer >> >> >> Which, I guess, means a failed Taskmanager. And since there are not >> enough task slots to run it goes into this endless loop again. It's never >> the same Taskmanager that fails. >> >> >> >> On the Taskmanager side, things look more interesting. I see a variety of >> exceptions: >> >> >> org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 >> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED. >> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution >> attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found. >> >> >> also >> >> >> WARNING: Failed read retry #1/10 for >> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. >> Sleeping... >> java.nio.channels.ClosedByInterruptException >> at >> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown >> Source) >> at >> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown >> Source) >> at >> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313) >> at >> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118) >> at java.base/java.io.DataInputStream.read(Unknown Source) >> at >> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94) >> at java.base/java.io.InputStream.read(Unknown Source) >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135) >> ... >> >> >> and >> >> >> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 >> retries for >> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c' >> 20:52:46.894 [<REDACTED> (141/624)#7] ERROR >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - >> Caught unexpected exception. >> java.nio.channels.ClosedChannelException: null >> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?] >> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?] >> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?] >> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?] >> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?] >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >> >> >> also >> >> >> 20:52:46.895 [<REDACTED> (141/624)#7] WARN >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure - >> Exception while restoring keyed state backend for >> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from >> alternative (1/1), will retry while more alternatives are available. >> org.apache.flink.runtime.state.BackendBuildingException: Caught >> unexpected exception. >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >> ... >> >> >> and a few of >> >> >> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to >> download data for state handles. >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >> ... >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.RuntimeException: no bytes written >> >> >> >> Has anyone seen behaviour like this? >> >> >> My current theory: because it needs to download a lot of state from GCS >> the pipeline probably experiences some sort of GCS back-off issue (150 >> taskmanager x 4 slots, also 4 >> state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many >> read requests to the same GCS prefix? And I guess it doesn't finish in the >> time that's expected and randomly fails. Maybe there is some kind of >> timeout value I can tweak? So downloading from GCS can take time that's >> necessary without failing prematurely. >> >> Any help is very appreciated! >> >> >> >>