Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save 
pointed).

Thank you,
Alex

> On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <afilipc...@gmail.com> wrote:
> 
> 
> Looked through the logs and didn't see anything fishy that indicated an 
> exception during checkpointing. 
> To make it clearer, here is the timeline (we use unaligned checkpoints, and 
> state size around 300Gb):
>  
> T1: Job1 was running
> T2: Job1 was savepointed, brought down and replaced with Job2. 
> T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, 
> brought down and replaced by Job3 that was restored from extarnilized 
> checkpoint of Job2
> T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, 
> brought down and replaced by Job4 that was restored from extarnilized 
> checkpoint of Job3
> T4: We realized that jobs were timing out to savepoint due to local disk 
> throttling. We provisioned disk with more throughput and IO. Job4 was 
> cancelled, Job4 was deployed and restored from externilized checkpoint of 
> Job3, but failed as it couldn't find some files in the folder that belongs to 
> the checkpoint of Job1
> T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but 
> all the attempts failed on reading files from the folder that belongs to the 
> checkpoint of Job1
> 
> We checked the content of the folder containing checkpoints of Job1, and it 
> has files. Not sure what is pointing tho missing files and what could've 
> removed them.
> 
> Any way we can figure out what could've happened? Is there a tool that can 
> read the checkpoint and check whether it is valid?
> 
> Alex
> 
>> On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <afilipc...@gmail.com> 
>> wrote:
>> On the checkpoints -> what kind of issues should I check for? I was looking 
>> for metrics and it looks like they were reporting successful checkpoints. It 
>> looks like some files were removed in the shared folder, but I'm not sure 
>> how to check for what caused it. 
>> 
>> Savepoints were failing due to savepoint timeout timeout. Based on metrics, 
>> our attached disks were not fast enough (GCS regional disks are network 
>> disks and were throttled). The team cancelled the savepoint and just killed 
>> the kubernetes cluster. I assume some checkpoints were interrupted as the 
>> job triggers them one after another. 
>> 
>> Is there a known issue with termination during running checkpoint? 
>> 
>> Btw, we use the Flink Kube operator from Lyft.
>> 
>> Alex
>> 
>> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <ches...@apache.org> wrote:
>>> Is there anything in the Flink logs indicating issues with writing the 
>>> checkpoint data?
>>> When the savepoint could not be created, was anything logged from Flink? 
>>> How did you shut down the cluster?
>>> 
>>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
>>>> Hi, 
>>>> 
>>>> Trying to figure out what happened with our Flink job. We use flink 1.11.1 
>>>> and run a job with unaligned checkpoints and Rocks Db backend. The whole 
>>>> state is around 300Gb judging by the size of savepoints. 
>>>> 
>>>> The job ran ok. At some point we tried to deploy new code, but we couldn't 
>>>> take a save point as they were timing out. It looks like the reason it was 
>>>> timing out was due to disk throttle (we use google regional disks). 
>>>> The new code was deployed using an externalized checkpoint, but it didn't 
>>>> start as job was failing with: 
>>>> 
>>>> Caused by: java.io.FileNotFoundException: Item not found: 
>>>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
>>>>  Note, it is possible that the live version is still available but the 
>>>> requested generation is deleted.
>>>>     at 
>>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
>>>>     at 
>>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
>>>>     at 
>>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
>>>>     at 
>>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
>>>>     at 
>>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>>>>     at 
>>>> com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
>>>>     at 
>>>> com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
>>>>     at 
>>>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>>>>     at 
>>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>>>>     at 
>>>> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>>>>     at 
>>>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>>>>     at 
>>>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
>>>>     at 
>>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>>>     at 
>>>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
>>>>     at 
>>>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
>>>>     ... 15 more
>>>> 
>>>> We tried to roll back the code, we tried different checkpoints, but all 
>>>> the attempts failed with the same error. The job ID in the error is not 
>>>> from the same checkpoint path, it looks like restore logic
>>>> looks back at previous jobs, as all the checkpoints after 
>>>> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same 
>>>> error.
>>>> We looked at different checkpoints and found that some of them miss 
>>>> metadata file and can't be used for restoration. 
>>>> We also use ZK for HA, and we cleaned up the state there between 
>>>> deployments to make sure the non existent file
>>>> is not coming from there. 
>>>> 
>>>> We decided to drop the state as we have means to repopulate it, but it 
>>>> would be great to get to the bottom of it. Any help will be appreciated. 
>>>> 
>>>> Alex 
>>> 

Reply via email to