Hi Alex,

A quick question. Are you using incremental checkpoints?

Best, Piotrek

sob., 5 cze 2021 o 21:23 <afilipc...@gmail.com> napisał(a):

> Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save
> pointed).
>
> Thank you,
> Alex
>
> On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <afilipc...@gmail.com>
> wrote:
>
> 
> Looked through the logs and didn't see anything fishy that indicated an
> exception during checkpointing.
> To make it clearer, here is the timeline (we use unaligned checkpoints,
> and state size around 300Gb):
>
> T1: Job1 was running
> T2: Job1 was savepointed, brought down and replaced with Job2.
> T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled,
> brought down and replaced by Job3 that was restored from extarnilized
> checkpoint of Job2
> T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled,
> brought down and replaced by Job4 that was restored from extarnilized
> checkpoint of Job3
> T4: We realized that jobs were timing out to savepoint due to local disk
> throttling. We provisioned disk with more throughput and IO. Job4 was
> cancelled, Job4 was deployed and restored from externilized checkpoint of
> Job3, but failed as it couldn't find some files in the folder that belongs
> to the checkpoint of *Job1*
> T5: We tried to redeploy and restore from checkpoints of Job3 and Job2,
> but all the attempts failed on reading files from the *folder that
> belongs to the checkpoint of Job1*
>
> We checked the content of the folder containing checkpoints of Job1, and
> it has files. Not sure what is pointing tho missing files and what could've
> removed them.
>
> Any way we can figure out what could've happened? Is there a tool that can
> read the checkpoint and check whether it is valid?
>
> Alex
>
> On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <afilipc...@gmail.com>
> wrote:
>
>> On the checkpoints -> what kind of issues should I check for? I was
>> looking for metrics and it looks like they were reporting successful
>> checkpoints. It looks like some files were removed in the shared folder,
>> but I'm not sure how to check for what caused it.
>>
>> Savepoints were failing due to savepoint timeout timeout. Based on
>> metrics, our attached disks were not fast enough (GCS regional disks are
>> network disks and were throttled). The team cancelled the savepoint and
>> just killed the kubernetes cluster. I assume some checkpoints were
>> interrupted as the job triggers them one after another.
>>
>> Is there a known issue with termination during running checkpoint?
>>
>> Btw, we use the Flink Kube operator from Lyft.
>>
>> Alex
>>
>> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> Is there anything in the Flink logs indicating issues with writing the
>>> checkpoint data?
>>> When the savepoint could not be created, was anything logged from Flink?
>>> How did you shut down the cluster?
>>>
>>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
>>>
>>> Hi,
>>>
>>> Trying to figure out what happened with our Flink job. We use flink
>>> 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The
>>> whole state is around 300Gb judging by the size of savepoints.
>>>
>>> The job ran ok. At some point we tried to deploy new code, but we
>>> couldn't take a save point as they were timing out. It looks like the
>>> reason it was timing out was due to disk throttle (we use google regional
>>> disks).
>>> The new code was deployed using an externalized checkpoint, but it
>>> didn't start as job was failing with:
>>>
>>> Caused by: java.io.FileNotFoundException: Item not found:
>>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
>>> Note, it is possible that the live version is still available but the
>>> requested generation is deleted.
>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions
>>> .createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(
>>> GoogleCloudStorageImpl.java:653)
>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(
>>> GoogleCloudStorageFileSystem.java:277)
>>>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(
>>> GoogleHadoopFSInputStream.java:78)
>>>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(
>>> GoogleHadoopFileSystemBase.java:620)
>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>>>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
>>> .java:120)
>>>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
>>> .java:37)
>>>     at org.apache.flink.core.fs.
>>> PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(
>>> PluginFileSystemFactory.java:127)
>>>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(
>>> SafetyNetWrapperFileSystem.java:85)
>>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle
>>> .openInputStream(FileStateHandle.java:69)
>>>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>>> .downloadDataForStateHandle(RocksDBStateDownloader.java:126)
>>>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>>> .lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>>>     at org.apache.flink.util.function.ThrowingRunnable
>>> .lambda$unchecked$0(ThrowingRunnable.java:50)
>>>     at java.util.concurrent.CompletableFuture$AsyncRun.run(
>>> CompletableFuture.java:1640)
>>>     at org.apache.flink.runtime.concurrent.DirectExecutorService
>>> .execute(DirectExecutorService.java:211)
>>>     at java.util.concurrent.CompletableFuture.asyncRunStage(
>>> CompletableFuture.java:1654)
>>>     at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture
>>> .java:1871)
>>>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>>> .downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>>>     at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>>> .transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
>>>     at org.apache.flink.contrib.streaming.state.restore.
>>> RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(
>>> RocksDBIncrementalRestoreOperation.java:230)
>>>     at org.apache.flink.contrib.streaming.state.restore.
>>> RocksDBIncrementalRestoreOperation.restoreFromRemoteState(
>>> RocksDBIncrementalRestoreOperation.java:195)
>>>     at org.apache.flink.contrib.streaming.state.restore.
>>> RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(
>>> RocksDBIncrementalRestoreOperation.java:169)
>>>     at org.apache.flink.contrib.streaming.state.restore.
>>> RocksDBIncrementalRestoreOperation.restore(
>>> RocksDBIncrementalRestoreOperation.java:155)
>>>     at org.apache.flink.contrib.streaming.state.
>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>>> .java:270)
>>>     ... 15 more
>>> We tried to roll back the code, we tried different checkpoints, but all
>>> the attempts failed with the same error. The job ID in the error is not
>>> from the same checkpoint path, it looks like restore logic
>>> looks back at previous jobs, as all the checkpoints after
>>> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
>>> We looked at different checkpoints and found that some of them miss
>>> metadata file and can't be used for restoration.
>>> We also use ZK for HA, and we cleaned up the state there between
>>> deployments to make sure the non existent file
>>> is not coming from there.
>>> We decided to drop the state as we have means to repopulate it, but it
>>> would be great to get to the bottom of it. Any help will be appreciated.
>>>
>>> Alex
>>>
>>>
>>>

Reply via email to