Ying Z created FLINK-35502:
------------------------------
Summary: compress the checkpoint metadata ZK/ETCD HA Services
Key: FLINK-35502
URL: https://issues.apache.org/jira/browse/FLINK-35502
Project: Flink
Issue Type: Improvement
Reporter: Ying Z
In the implementation of Flink HA, the metadata of checkpoints is stored in
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
```
checkpointID-0000000000000036044: xxxx
checkpointID-0000000000000036045: xxxx
...
...
```
However, neither of these are designed to store excessive amounts of data. If
the
[state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
setting is set too large, it can easily cause abnormalities in ZK/ETCD.
The error log when set state.checkpoints.num-retained to 1500:
```
Caused by: org.apache.flink.util.SerializedThrowable:
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader.
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status:
Status(apiVersion=v1, code=422,
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l,
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null,
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null,
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid,
status=Failure, additionalProperties=(}).
```
In Flink's code, all checkpoint metadata are updated at the same time, and The
checkpoint metadata contains many repeated bytes, therefore it can achieve a
very good compression ratio.
Therefore, I suggest compressing the data when writing checkpoints and
decompressing it when reading, to reduce storage pressure and improve IO
efficiency.
Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
```java
// Map -> Json
ObjectMapper objectMapper = new ObjectMapper();
String checkpointJson =
objectMapper.writeValueAsString(checkpointMap);
// copress and base64
String compressedBase64 = compressAndEncode(checkpointJson);
compressedData.put("checkpoint-all", compressedBase64);
private static String compressAndEncode(String data) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutputStream = new
GZIPOutputStream(outputStream)) {
gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
}
byte[] compressedData = outputStream.toByteArray();
return Base64.getEncoder().encodeToString(compressedData);
}
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)