[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-04 Thread Mingliang Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852132#comment-17852132
 ] 

Mingliang Liu commented on FLINK-35502:
---

I guess in my day job I don't see user requests that need to recover from any 
point in the past days. I think it works just fine to recover from recent 
checkpoints in the past days. And compressing is a good improvement as data is 
getting large.

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ying Z
>Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0036044: 
> checkpointID-0036045: 
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
> copress and base64  
> String compressedBase64 = compressAndEncode(checkpointJson); 
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-03 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851859#comment-17851859
 ] 

Ying Z commented on FLINK-35502:


Yes, I understand that using cron + savepoint can partially solve the problem. 
However, we need to recover the tasks from any point in the past three days, 
and maybe increasing the frequency of cron is also feasible. 
But I don't understand why we can't just use checkpointing to automatically 
achieve this?

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ying Z
>Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0036044: 
> checkpointID-0036045: 
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
> copress and base64  
> String compressedBase64 = compressAndEncode(checkpointJson); 
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-03 Thread Mingliang Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851744#comment-17851744
 ] 

Mingliang Liu commented on FLINK-35502:
---

When would you need to restore from checkpoints 3 days old? If that is for 
planed manual operation, have you considered savepoints? In my day job, we 
enable daily savepoints automatically as a cron job in the control plane.

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ying Z
>Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0036044: 
> checkpointID-0036045: 
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
> copress and base64  
> String compressedBase64 = compressAndEncode(checkpointJson); 
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-01 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851346#comment-17851346
 ] 

Ying Z commented on FLINK-35502:


[~roman] 
My usage scenario is as follows:
The checkpoints of the task are stored in $state.checkpoints.dir, and the 
number of them is determined by $state.checkpoints.num-retained.
For example, if checkpoint interval is 5 minutes, and you want to recover from 
a checkpoint that was made 3 days ago when restarting the task, you would need 
to save 60/5*24*3 = 864 checkpoints.
If the checkpoint frequency is higher, or if you need to recover from an 
earlier time, state.checkpoints.num-retained will need to be set larger.

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ying Z
>Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0036044: 
> checkpointID-0036045: 
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
> copress and base64  
> String compressedBase64 = compressAndEncode(checkpointJson); 
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-01 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851292#comment-17851292
 ] 

Roman Khachatryan commented on FLINK-35502:
---

[~yingz] , could you clarify the use case requiring setting 
state.checkpoints.num-retained to 1500?

Most of the checkpoint data should already be stored on the (distributed) file 
system and etcd/zk is only used to store the pointer; so the load on etcd/zk 
shouldn't be high in normal cases.

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ying Z
>Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0036044: 
> checkpointID-0036045: 
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
> copress and base64  
> String compressedBase64 = compressAndEncode(checkpointJson); 
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)