On the checkpoints -> what kind of issues should I check for? I was looking
for metrics and it looks like they were reporting successful checkpoints.
It looks like some files were removed in the shared folder, but I'm not
sure how to check for what caused it.

Savepoints were failing due to savepoint timeout timeout. Based on metrics,
our attached disks were not fast enough (GCS regional disks are network
disks and were throttled). The team cancelled the savepoint and just killed
the kubernetes cluster. I assume some checkpoints were interrupted as the
job triggers them one after another.

Is there a known issue with termination during running checkpoint?

Btw, we use the Flink Kube operator from Lyft.

Alex

On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <ches...@apache.org> wrote:

> Is there anything in the Flink logs indicating issues with writing the
> checkpoint data?
> When the savepoint could not be created, was anything logged from Flink?
> How did you shut down the cluster?
>
> On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
>
> Hi,
>
> Trying to figure out what happened with our Flink job. We use flink 1.11.1
> and run a job with unaligned checkpoints and Rocks Db backend. The whole
> state is around 300Gb judging by the size of savepoints.
>
> The job ran ok. At some point we tried to deploy new code, but we couldn't
> take a save point as they were timing out. It looks like the reason it was
> timing out was due to disk throttle (we use google regional disks).
> The new code was deployed using an externalized checkpoint, but it didn't
> start as job was failing with:
>
> Caused by: java.io.FileNotFoundException: Item not found:
> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
> Note, it is possible that the live version is still available but the
> requested generation is deleted.
>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions
> .createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(
> GoogleCloudStorageImpl.java:653)
>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(
> GoogleCloudStorageFileSystem.java:277)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(
> GoogleHadoopFSInputStream.java:78)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(
> GoogleHadoopFileSystemBase.java:620)
>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
> .java:120)
>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
> .java:37)
>     at org.apache.flink.core.fs.
> PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(
> PluginFileSystemFactory.java:127)
>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(
> SafetyNetWrapperFileSystem.java:85)
>     at org.apache.flink.runtime.state.filesystem.FileStateHandle
> .openInputStream(FileStateHandle.java:69)
>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
> .downloadDataForStateHandle(RocksDBStateDownloader.java:126)
>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
> .lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>     at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(
> ThrowingRunnable.java:50)
>     at java.util.concurrent.CompletableFuture$AsyncRun.run(
> CompletableFuture.java:1640)
>     at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(
> DirectExecutorService.java:211)
>     at java.util.concurrent.CompletableFuture.asyncRunStage(
> CompletableFuture.java:1654)
>     at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture
> .java:1871)
>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
> .downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
> .transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
>     at org.apache.flink.contrib.streaming.state.restore.
> RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(
> RocksDBIncrementalRestoreOperation.java:230)
>     at org.apache.flink.contrib.streaming.state.restore.
> RocksDBIncrementalRestoreOperation.restoreFromRemoteState(
> RocksDBIncrementalRestoreOperation.java:195)
>     at org.apache.flink.contrib.streaming.state.restore.
> RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(
> RocksDBIncrementalRestoreOperation.java:169)
>     at org.apache.flink.contrib.streaming.state.restore.
> RocksDBIncrementalRestoreOperation.restore(
> RocksDBIncrementalRestoreOperation.java:155)
>     at org.apache.flink.contrib.streaming.state.
> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
> .java:270)
>     ... 15 more
> We tried to roll back the code, we tried different checkpoints, but all
> the attempts failed with the same error. The job ID in the error is not
> from the same checkpoint path, it looks like restore logic
> looks back at previous jobs, as all the checkpoints after
> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
> We looked at different checkpoints and found that some of them miss
> metadata file and can't be used for restoration.
> We also use ZK for HA, and we cleaned up the state there between
> deployments to make sure the non existent file
> is not coming from there.
> We decided to drop the state as we have means to repopulate it, but it
> would be great to get to the bottom of it. Any help will be appreciated.
>
> Alex
>
>
>

Reply via email to