Hi David,

which S3 file system implementation are you using? If I'm not mistaken,
then the state backend should try to directly write to the target file
system. If this should result in temporary files on your TM, then this
might be a problem of the file system implementation. Having access to the
logs could also help to better understand whats going on.

Cheers,
Till

On Tue, Jul 14, 2020 at 11:57 AM David Magalhães <speeddra...@gmail.com>
wrote:

> Hi Congxian, sorry for the late reply.
>
> I'm using the filesystem with an S3 path as the default state backend in
> flink-conf.yml (state.backend: filesystem).
> The Flink version I'm using is 1.10.1.
>
> By "The task manager did not clean up the state", I mean what the
> taskmanager was writing on disk the savepoint file, but it didn't delete it
> after the other taskmanager had an issue with the disk being full. The
> expected scenario would be both taskmanagers remove the savepoint they were
> trying to do from the disk, but only the one that reached 100% disk space
> use did it.
>
> For my scenario, I'm using the Flink REST API to start/deploy jobs. A
> retained checkpoint isn't supported in REST API and even if it was, I think
> it doesn't fit my scenario (stop a job, and start the new one from the
> saved state).
>
> Thanks,
> David
>
> On Sat, Jul 11, 2020 at 8:14 AM Congxian Qiu <qcx978132...@gmail.com>
> wrote:
>
>> Hi David
>>
>> As you say the savepoint use local disk, I assume that you use
>> RocksDBStateBackend.
>> What's the flink version are you using now?
>>
>> What do you mean "The task manager did not clean up the state"?, does
>> that mean the local disk space did not  clean up, do the task encounter
>> failover in this period?
>>
>> The snapshot speed will be limited by the network bandwidth and the local
>> io performance.
>> IIUC, currently only checkpoint support local recovery
>>
>> PS: If you want the snapshot complete quickly, maybe you can try retained
>> checkpoint[1], and multiple threads uploads[2]
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints
>> [2] https://issues.apache.org/jira/browse/FLINK-11008
>>
>> Best,
>> Congxian
>>
>>
>> David Magalhães <speeddra...@gmail.com> 于2020年7月10日周五 下午7:37写道:
>>
>>> Hi, yesterday when I was creating a savepoint (to S3, around 8GB of
>>> state) using 2 TaskManager (8 GB) and it failed because one of the task
>>> managers fill up the disk (probably didn't have enough RAM to save the
>>> state into S3 directly,I don't know what was the disk space, and reached
>>> 100% usage space and the other one reached 99%).
>>>
>>> After the crash, the task manager that reach 100% deleted the "failed
>>> savepoint" from the local disk but the other one that reached 99% kept it.
>>> Shouldn't this task manager also clean up the failed state?
>>>
>>> After cleaning up the disk of that task manager, I've increased the
>>> parallelism to 6, created a new state of 8GB and all went smoothly, but it
>>> took 8 minutes to start processing in the new job created with the previous
>>> savepoint.
>>>
>>> [image: flink_grafana.png]
>>> Here is the network IO from the 6 task managers used and I have a few
>>> questions:
>>>
>>> - Isn't 25 Mbps of average speed a bit low? What could be the limitation?
>>> - For 8 GB of state, gives around 7 minutes to download it [ 8000 MB
>>> /(25Mbps/8*6 task managers)/60 seconds ], that should match the consistent
>>> part of 7/8 minute graph, and then started reading from Kafka topic.
>>> - Can I mitigate this with task local recovery [1]? Or is this only for
>>> a checkpoint ?
>>> - We are using *m5.xlarge* (4 vcpu, 16GB RAM) with 2 slots per TM.
>>>
>>> Thanks,
>>> David
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
>>>
>>

Reply via email to