Looked through the logs and didn't see anything fishy that indicated an
exception during checkpointing.
To make it clearer, here is the timeline (we use unaligned checkpoints, and
state size around 300Gb):

T1: Job1 was running
T2: Job1 was savepointed, brought down and replaced with Job2.
T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled,
brought down and replaced by Job3 that was restored from extarnilized
checkpoint of Job2
T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled,
brought down and replaced by Job4 that was restored from extarnilized
checkpoint of Job3
T4: We realized that jobs were timing out to savepoint due to local disk
throttling. We provisioned disk with more throughput and IO. Job4 was
cancelled, Job4 was deployed and restored from externilized checkpoint of
Job3, but failed as it couldn't find some files in the folder that belongs
to the checkpoint of *Job1*
T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but
all the attempts failed on reading files from the *folder that belongs to
the checkpoint of Job1*

We checked the content of the folder containing checkpoints of Job1, and it
has files. Not sure what is pointing tho missing files and what could've
removed them.

Any way we can figure out what could've happened? Is there a tool that can
read the checkpoint and check whether it is valid?

Alex

On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <afilipc...@gmail.com>
wrote:

> On the checkpoints -> what kind of issues should I check for? I was
> looking for metrics and it looks like they were reporting successful
> checkpoints. It looks like some files were removed in the shared folder,
> but I'm not sure how to check for what caused it.
>
> Savepoints were failing due to savepoint timeout timeout. Based on
> metrics, our attached disks were not fast enough (GCS regional disks are
> network disks and were throttled). The team cancelled the savepoint and
> just killed the kubernetes cluster. I assume some checkpoints were
> interrupted as the job triggers them one after another.
>
> Is there a known issue with termination during running checkpoint?
>
> Btw, we use the Flink Kube operator from Lyft.
>
> Alex
>
> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Is there anything in the Flink logs indicating issues with writing the
>> checkpoint data?
>> When the savepoint could not be created, was anything logged from Flink?
>> How did you shut down the cluster?
>>
>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
>>
>> Hi,
>>
>> Trying to figure out what happened with our Flink job. We use flink
>> 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The
>> whole state is around 300Gb judging by the size of savepoints.
>>
>> The job ran ok. At some point we tried to deploy new code, but we
>> couldn't take a save point as they were timing out. It looks like the
>> reason it was timing out was due to disk throttle (we use google regional
>> disks).
>> The new code was deployed using an externalized checkpoint, but it didn't
>> start as job was failing with:
>>
>> Caused by: java.io.FileNotFoundException: Item not found:
>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
>> Note, it is possible that the live version is still available but the
>> requested generation is deleted.
>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions
>> .createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(
>> GoogleCloudStorageImpl.java:653)
>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(
>> GoogleCloudStorageFileSystem.java:277)
>>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(
>> GoogleHadoopFSInputStream.java:78)
>>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(
>> GoogleHadoopFileSystemBase.java:620)
>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
>> .java:120)
>>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
>> .java:37)
>>     at org.apache.flink.core.fs.
>> PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(
>> PluginFileSystemFactory.java:127)
>>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(
>> SafetyNetWrapperFileSystem.java:85)
>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle
>> .openInputStream(FileStateHandle.java:69)
>>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>> .downloadDataForStateHandle(RocksDBStateDownloader.java:126)
>>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>> .lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>>     at org.apache.flink.util.function.ThrowingRunnable
>> .lambda$unchecked$0(ThrowingRunnable.java:50)
>>     at java.util.concurrent.CompletableFuture$AsyncRun.run(
>> CompletableFuture.java:1640)
>>     at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(
>> DirectExecutorService.java:211)
>>     at java.util.concurrent.CompletableFuture.asyncRunStage(
>> CompletableFuture.java:1654)
>>     at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture
>> .java:1871)
>>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>> .downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>> .transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
>>     at org.apache.flink.contrib.streaming.state.restore.
>> RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(
>> RocksDBIncrementalRestoreOperation.java:230)
>>     at org.apache.flink.contrib.streaming.state.restore.
>> RocksDBIncrementalRestoreOperation.restoreFromRemoteState(
>> RocksDBIncrementalRestoreOperation.java:195)
>>     at org.apache.flink.contrib.streaming.state.restore.
>> RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(
>> RocksDBIncrementalRestoreOperation.java:169)
>>     at org.apache.flink.contrib.streaming.state.restore.
>> RocksDBIncrementalRestoreOperation.restore(
>> RocksDBIncrementalRestoreOperation.java:155)
>>     at org.apache.flink.contrib.streaming.state.
>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>> .java:270)
>>     ... 15 more
>> We tried to roll back the code, we tried different checkpoints, but all
>> the attempts failed with the same error. The job ID in the error is not
>> from the same checkpoint path, it looks like restore logic
>> looks back at previous jobs, as all the checkpoints after
>> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
>> We looked at different checkpoints and found that some of them miss
>> metadata file and can't be used for restoration.
>> We also use ZK for HA, and we cleaned up the state there between
>> deployments to make sure the non existent file
>> is not coming from there.
>> We decided to drop the state as we have means to repopulate it, but it
>> would be great to get to the bottom of it. Any help will be appreciated.
>>
>> Alex
>>
>>
>>

Reply via email to