[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852132#comment-17852132
 ] 

Mingliang Liu commented on FLINK-35502:
---------------------------------------

I guess in my day job I don't see user requests that need to recover from any 
point in the past days. I think it works just fine to recover from recent 
checkpoints in the past days. And compressing is a good improvement as data is 
getting large.

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -----------------------------------------------------------------
>
>                 Key: FLINK-35502
>                 URL: https://issues.apache.org/jira/browse/FLINK-35502
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Ying Z
>            Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0000000000000036044: xxxx
> checkpointID-0000000000000036045: xxxx
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
> copress and base64  
> String compressedBase64 = compressAndEncode(checkpointJson); 
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to