Ying Z created FLINK-35502:
------------------------------

             Summary: compress the checkpoint metadata ZK/ETCD HA Services
                 Key: FLINK-35502
                 URL: https://issues.apache.org/jira/browse/FLINK-35502
             Project: Flink
          Issue Type: Improvement
            Reporter: Ying Z


In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:

```
checkpointID-0000000000000036044: xxxx
checkpointID-0000000000000036045: xxxx
...
...
```

However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:

```
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}).
```

In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size from 1M bytes to 30K.

```java
                // Map -> Json
                ObjectMapper objectMapper = new ObjectMapper();
                String checkpointJson = 
objectMapper.writeValueAsString(checkpointMap);
                // copress and base64
                String compressedBase64 = compressAndEncode(checkpointJson);
                compressedData.put("checkpoint-all", compressedBase64);

    private static String compressAndEncode(String data) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(outputStream)) {
            gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
        }
        byte[] compressedData = outputStream.toByteArray();
        return Base64.getEncoder().encodeToString(compressedData);
    }
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to