[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-03 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851859#comment-17851859
 ] 

Ying Z commented on FLINK-35502:


Yes, I understand that using cron + savepoint can partially solve the problem. 
However, we need to recover the tasks from any point in the past three days, 
and maybe increasing the frequency of cron is also feasible. 
But I don't understand why we can't just use checkpointing to automatically 
achieve this?

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ying Z
>Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0036044: 
> checkpointID-0036045: 
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
> copress and base64  
> String compressedBase64 = compressAndEncode(checkpointJson); 
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-01 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851346#comment-17851346
 ] 

Ying Z commented on FLINK-35502:


[~roman] 
My usage scenario is as follows:
The checkpoints of the task are stored in $state.checkpoints.dir, and the 
number of them is determined by $state.checkpoints.num-retained.
For example, if checkpoint interval is 5 minutes, and you want to recover from 
a checkpoint that was made 3 days ago when restarting the task, you would need 
to save 60/5*24*3 = 864 checkpoints.
If the checkpoint frequency is higher, or if you need to recover from an 
earlier time, state.checkpoints.num-retained will need to be set larger.

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ying Z
>Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0036044: 
> checkpointID-0036045: 
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
> copress and base64  
> String compressedBase64 = compressAndEncode(checkpointJson); 
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-01 Thread Ying Z (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Z updated FLINK-35502:
---
Description: 
In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
{code:java}
checkpointID-0036044: 
checkpointID-0036045: 
...
... {code}
However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:
{code:java}
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}). {code}
In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
{code:java}
// Map -> Json
ObjectMapper objectMapper = new ObjectMapper();
String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // 
copress and base64  
String compressedBase64 = compressAndEncode(checkpointJson); 
compressedData.put("checkpoint-all", compressedBase64);{code}
{code:java}
    private static String compressAndEncode(String data) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(outputStream))
{             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));    
     }
        byte[] compressedData = outputStream.toByteArray();
        return Base64.getEncoder().encodeToString(compressedData);
    } {code}

  was:
In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:


{code:java}
checkpointID-0036044: 
checkpointID-0036045: 
...
... {code}
However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:
{code:java}
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}). {code}
In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
{code:java}
                //

[jira] [Updated] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-01 Thread Ying Z (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Z updated FLINK-35502:
---
Description: 
In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:


{code:java}
checkpointID-0036044: 
checkpointID-0036045: 
...
... {code}
However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:
{code:java}
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}). {code}
In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
{code:java}
                // Map -> Json
                ObjectMapper objectMapper = new ObjectMapper();
                String checkpointJson = 
objectMapper.writeValueAsString(checkpointMap);
                // copress and base64
                String compressedBase64 = compressAndEncode(checkpointJson);
                compressedData.put("checkpoint-all", compressedBase64);
    private static String compressAndEncode(String data) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(outputStream))
{             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));    
     }
        byte[] compressedData = outputStream.toByteArray();
        return Base64.getEncoder().encodeToString(compressedData);
    } {code}

  was:
In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:

```
checkpointID-0036044: 
checkpointID-0036045: 
...
...
```

However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:

```
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}).
```

In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size 

[jira] [Updated] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services

2024-06-01 Thread Ying Z (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Z updated FLINK-35502:
---
Summary: compress the checkpoint metadata generated by ZK/ETCD HA Services  
(was: compress the checkpoint metadata ZK/ETCD HA Services)

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ying Z
>Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> ```
> checkpointID-0036044: 
> checkpointID-0036045: 
> ...
> ...
> ```
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> ```
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}).
> ```
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> ```java
>                 // Map -> Json
>                 ObjectMapper objectMapper = new ObjectMapper();
>                 String checkpointJson = 
> objectMapper.writeValueAsString(checkpointMap);
>                 // copress and base64
>                 String compressedBase64 = compressAndEncode(checkpointJson);
>                 compressedData.put("checkpoint-all", compressedBase64);
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream)) {
>             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
>         }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35502) compress the checkpoint metadata ZK/ETCD HA Services

2024-06-01 Thread Ying Z (Jira)
Ying Z created FLINK-35502:
--

 Summary: compress the checkpoint metadata ZK/ETCD HA Services
 Key: FLINK-35502
 URL: https://issues.apache.org/jira/browse/FLINK-35502
 Project: Flink
  Issue Type: Improvement
Reporter: Ying Z


In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:

```
checkpointID-0036044: 
checkpointID-0036045: 
...
...
```

However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:

```
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}).
```

In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size from 1M bytes to 30K.

```java
                // Map -> Json
                ObjectMapper objectMapper = new ObjectMapper();
                String checkpointJson = 
objectMapper.writeValueAsString(checkpointMap);
                // copress and base64
                String compressedBase64 = compressAndEncode(checkpointJson);
                compressedData.put("checkpoint-all", compressedBase64);

    private static String compressAndEncode(String data) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(outputStream)) {
            gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
        }
        byte[] compressedData = outputStream.toByteArray();
        return Base64.getEncoder().encodeToString(compressedData);
    }
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19298) Maven enforce goal dependency-convergence failed on flink-json

2020-09-23 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17200745#comment-17200745
 ] 

Ying Z commented on FLINK-19298:


For com.google.guava:guava:19.0 and 16.0.1, I think we can simply add a 
 tag to the pom.xml[1].

Convergence of janino makes me a little confused:
1. now we use calcite 1.22.0、janino 3.0.9 [2], and the comments remind us 
{color:#DE350B}{color}
2. in fact calcite-core 1.22.0 depends on janino 3.0.11 [3]

So the way to address this issue is to set janino.version to 3.0.11, or 3.0.9 
is setted for historical reasons?

1. 
https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/pom.xml
2. https://github.com/apache/flink/blob/master/flink-table/pom.xml
3. https://mvnrepository.com/artifact/org.apache.calcite/calcite-core/1.22.0

> Maven enforce goal dependency-convergence failed on flink-json
> --
>
> Key: FLINK-19298
> URL: https://issues.apache.org/jira/browse/FLINK-19298
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Critical
>
> See more 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6669&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=9b1a0f88-517b-5893-fc93-76f4670982b4
> {code}
> 2020-09-20T17:08:16.0930669Z 17:08:16.092 [INFO] --- 
> maven-enforcer-plugin:3.0.0-M1:enforce (dependency-convergence) @ flink-json 
> ---
> 2020-09-20T17:08:16.1089006Z 17:08:16.103 [WARNING] 
> 2020-09-20T17:08:16.1089561Z Dependency convergence error for 
> com.google.guava:guava:19.0 paths to dependency are:
> 2020-09-20T17:08:16.1090432Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1091072Z   
> +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1091670Z +-com.google.guava:guava:19.0
> 2020-09-20T17:08:16.1092014Z and
> 2020-09-20T17:08:16.1092496Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1093322Z   
> +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1093926Z +-org.apache.calcite:calcite-core:1.22.0
> 2020-09-20T17:08:16.1094521Z   +-org.apache.calcite:calcite-linq4j:1.22.0
> 2020-09-20T17:08:16.1095076Z +-com.google.guava:guava:19.0
> 2020-09-20T17:08:16.1095441Z and
> 2020-09-20T17:08:16.1095927Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1096726Z   
> +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1097419Z +-org.apache.calcite:calcite-core:1.22.0
> 2020-09-20T17:08:16.1098042Z   +-com.google.guava:guava:19.0
> 2020-09-20T17:08:16.1098435Z and
> 2020-09-20T17:08:16.1098984Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1099700Z   
> +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1100359Z +-com.google.guava:guava:19.0
> 2020-09-20T17:08:16.1100749Z and
> 2020-09-20T17:08:16.1101293Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1128892Z   
> +-org.apache.flink:flink-test-utils_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1129766Z +-org.apache.curator:curator-test:2.12.0
> 2020-09-20T17:08:16.1130466Z   +-com.google.guava:guava:16.0.1
> 2020-09-20T17:08:16.1130843Z 
> 2020-09-20T17:08:16.1131224Z 17:08:16.109 [WARNING] 
> 2020-09-20T17:08:16.1132069Z Dependency convergence error for 
> org.codehaus.janino:commons-compiler:3.0.9 paths to dependency are:
> 2020-09-20T17:08:16.1133127Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1133906Z   
> +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1134663Z +-org.codehaus.janino:commons-compiler:3.0.9
> 2020-09-20T17:08:16.1135224Z and
> 2020-09-20T17:08:16.1135772Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1136487Z   
> +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1137150Z +-org.codehaus.janino:janino:3.0.9
> 2020-09-20T17:08:16.1137825Z   
> +-org.codehaus.janino:commons-compiler:3.0.9
> 2020-09-20T17:08:16.1138250Z and
> 2020-09-20T17:08:16.1138798Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1139514Z   
> +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1141028Z +-org.apache.calcite:calcite-core:1.22.0
> 2020-09-20T17:08:16.1141782Z   
> +-org.codehaus.janino:commons-compiler:3.0.11
> 2020-09-20T17:08:16.1142140Z and
> 2020-09-20T17:08:16.1142635Z +-org.apache.flink:flink-json:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1143270Z   
> +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
> 2020-09-20T17:08:16.1143913Z +-org.codehaus.ja

[jira] [Commented] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.

2020-09-19 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198768#comment-17198768
 ] 

Ying Z commented on FLINK-19244:


[~jark]
I try to pull a request, and cloud you help to assign this ticket? Thanks.

> CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
> 
>
> Key: FLINK-19244
> URL: https://issues.apache.org/jira/browse/FLINK-19244
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Ying Z
>Priority: Major
>  Labels: pull-request-available
>
> CREATE TABLE csv_table (
>  f0 ROW,
>  f1 ROW
>  )
> If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456
> When deserialize the data, the jsonNode of f0 would be [], then throws cast 
> exception: Row length mismatch. 2 fields expected but was 0.
> In the real scene, I set two streams:
>  First, read json_table, sink to csv_table, which has the schema above.
>  Then, read csv_table, do sth.
> if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second 
> streams failed with the exception.
> If this is a bug, I want to help to fix this and unittests.
>  
> here is the  test code:
> {code:java}
> // code placeholder
> val subDataType0 = ROW(
>   FIELD("f0c0", STRING()),
>   FIELD("f0c1", STRING())
> )
> val subDataType1 = ROW(
>   FIELD("f1c0", INT()),
>   FIELD("f1c1", INT())
> )
> val datatype = ROW(
>   FIELD("f0", subDataType0),
>   FIELD("f1", subDataType1))
> val rowType = datatype.getLogicalType.asInstanceOf[RowType]
> val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build()
> val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new 
> RowDataTypeInfo(rowType)).build()
> def foo(r: RowData): Unit = {
>   val serData = new String(serSchema.serialize(r))
>   print(s"${serData}")
>   val deserRow = deserSchema.deserialize(serData.getBytes)
>   println(s"${deserRow}")
> }
> val normalRowData = GenericRowData.of(
>   GenericRowData.of(BinaryStringData.fromString("hello"), 
> BinaryStringData.fromString("world")),
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> // correct.
> foo(normalRowData)
> val nullRowData = GenericRowData.of(
>   null,
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> /*
> Exception in thread "main" java.io.IOException: Failed to deserialize CSV row 
> ',123;456
> ...
> Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected 
> but was 0.
>  */
> foo(nullRowData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.

2020-09-15 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196668#comment-17196668
 ] 

Ying Z commented on FLINK-19244:


[~jark]

Thanks for your advice, this will lead the user to use format like 
json/arvo/protobuf for nested types too:)

> CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
> 
>
> Key: FLINK-19244
> URL: https://issues.apache.org/jira/browse/FLINK-19244
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Ying Z
>Priority: Major
>
> CREATE TABLE csv_table (
>  f0 ROW,
>  f1 ROW
>  )
> If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456
> When deserialize the data, the jsonNode of f0 would be [], then throws cast 
> exception: Row length mismatch. 2 fields expected but was 0.
> In the real scene, I set two streams:
>  First, read json_table, sink to csv_table, which has the schema above.
>  Then, read csv_table, do sth.
> if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second 
> streams failed with the exception.
> If this is a bug, I want to help to fix this and unittests.
>  
> here is the  test code:
> {code:java}
> // code placeholder
> val subDataType0 = ROW(
>   FIELD("f0c0", STRING()),
>   FIELD("f0c1", STRING())
> )
> val subDataType1 = ROW(
>   FIELD("f1c0", INT()),
>   FIELD("f1c1", INT())
> )
> val datatype = ROW(
>   FIELD("f0", subDataType0),
>   FIELD("f1", subDataType1))
> val rowType = datatype.getLogicalType.asInstanceOf[RowType]
> val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build()
> val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new 
> RowDataTypeInfo(rowType)).build()
> def foo(r: RowData): Unit = {
>   val serData = new String(serSchema.serialize(r))
>   print(s"${serData}")
>   val deserRow = deserSchema.deserialize(serData.getBytes)
>   println(s"${deserRow}")
> }
> val normalRowData = GenericRowData.of(
>   GenericRowData.of(BinaryStringData.fromString("hello"), 
> BinaryStringData.fromString("world")),
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> // correct.
> foo(normalRowData)
> val nullRowData = GenericRowData.of(
>   null,
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> /*
> Exception in thread "main" java.io.IOException: Failed to deserialize CSV row 
> ',123;456
> ...
> Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected 
> but was 0.
>  */
> foo(nullRowData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.

2020-09-15 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196636#comment-17196636
 ] 

Ying Z commented on FLINK-19244:


[~jark] ignoreParseErrors=true will skip this record

x = serialize(y) means y = deserialize(x), so is deserialize(',123;456') = 
originalRowData maybe better?

 

I totally agree with 'CSV is not a good format to handle nested types'. I apply 
this to compat with the legacy code, which uses csv format for less space and 
more visible.

> CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
> 
>
> Key: FLINK-19244
> URL: https://issues.apache.org/jira/browse/FLINK-19244
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Ying Z
>Priority: Major
>
> CREATE TABLE csv_table (
>  f0 ROW,
>  f1 ROW
>  )
> If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456
> When deserialize the data, the jsonNode of f0 would be [], then throws cast 
> exception: Row length mismatch. 2 fields expected but was 0.
> In the real scene, I set two streams:
>  First, read json_table, sink to csv_table, which has the schema above.
>  Then, read csv_table, do sth.
> if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second 
> streams failed with the exception.
> If this is a bug, I want to help to fix this and unittests.
>  
> here is the  test code:
> {code:java}
> // code placeholder
> val subDataType0 = ROW(
>   FIELD("f0c0", STRING()),
>   FIELD("f0c1", STRING())
> )
> val subDataType1 = ROW(
>   FIELD("f1c0", INT()),
>   FIELD("f1c1", INT())
> )
> val datatype = ROW(
>   FIELD("f0", subDataType0),
>   FIELD("f1", subDataType1))
> val rowType = datatype.getLogicalType.asInstanceOf[RowType]
> val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build()
> val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new 
> RowDataTypeInfo(rowType)).build()
> def foo(r: RowData): Unit = {
>   val serData = new String(serSchema.serialize(r))
>   print(s"${serData}")
>   val deserRow = deserSchema.deserialize(serData.getBytes)
>   println(s"${deserRow}")
> }
> val normalRowData = GenericRowData.of(
>   GenericRowData.of(BinaryStringData.fromString("hello"), 
> BinaryStringData.fromString("world")),
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> // correct.
> foo(normalRowData)
> val nullRowData = GenericRowData.of(
>   null,
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> /*
> Exception in thread "main" java.io.IOException: Failed to deserialize CSV row 
> ',123;456
> ...
> Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected 
> but was 0.
>  */
> foo(nullRowData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.

2020-09-15 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196636#comment-17196636
 ] 

Ying Z edited comment on FLINK-19244 at 9/16/20, 2:59 AM:
--

[~jark] ignoreParseErrors=true will skip this record

x = serialize( y) means y = deserialize( x), so is deserialize(',123;456') = 
originalRowData maybe better?

 

I totally agree with 'CSV is not a good format to handle nested types'. I apply 
this to compat with the legacy code, which uses csv format for less space and 
more visible.


was (Author: yingz):
[~jark] ignoreParseErrors=true will skip this record

x = serialize(y) means y = deserialize(x), so is deserialize(',123;456') = 
originalRowData maybe better?

 

I totally agree with 'CSV is not a good format to handle nested types'. I apply 
this to compat with the legacy code, which uses csv format for less space and 
more visible.

> CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
> 
>
> Key: FLINK-19244
> URL: https://issues.apache.org/jira/browse/FLINK-19244
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Ying Z
>Priority: Major
>
> CREATE TABLE csv_table (
>  f0 ROW,
>  f1 ROW
>  )
> If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456
> When deserialize the data, the jsonNode of f0 would be [], then throws cast 
> exception: Row length mismatch. 2 fields expected but was 0.
> In the real scene, I set two streams:
>  First, read json_table, sink to csv_table, which has the schema above.
>  Then, read csv_table, do sth.
> if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second 
> streams failed with the exception.
> If this is a bug, I want to help to fix this and unittests.
>  
> here is the  test code:
> {code:java}
> // code placeholder
> val subDataType0 = ROW(
>   FIELD("f0c0", STRING()),
>   FIELD("f0c1", STRING())
> )
> val subDataType1 = ROW(
>   FIELD("f1c0", INT()),
>   FIELD("f1c1", INT())
> )
> val datatype = ROW(
>   FIELD("f0", subDataType0),
>   FIELD("f1", subDataType1))
> val rowType = datatype.getLogicalType.asInstanceOf[RowType]
> val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build()
> val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new 
> RowDataTypeInfo(rowType)).build()
> def foo(r: RowData): Unit = {
>   val serData = new String(serSchema.serialize(r))
>   print(s"${serData}")
>   val deserRow = deserSchema.deserialize(serData.getBytes)
>   println(s"${deserRow}")
> }
> val normalRowData = GenericRowData.of(
>   GenericRowData.of(BinaryStringData.fromString("hello"), 
> BinaryStringData.fromString("world")),
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> // correct.
> foo(normalRowData)
> val nullRowData = GenericRowData.of(
>   null,
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> /*
> Exception in thread "main" java.io.IOException: Failed to deserialize CSV row 
> ',123;456
> ...
> Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected 
> but was 0.
>  */
> foo(nullRowData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.

2020-09-15 Thread Ying Z (Jira)
Ying Z created FLINK-19244:
--

 Summary: CsvRowDataDeserializationSchema throws cast exception : 
Row length mismatch.
 Key: FLINK-19244
 URL: https://issues.apache.org/jira/browse/FLINK-19244
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.11.0
Reporter: Ying Z


CREATE TABLE csv_table (
 f0 ROW,
 f1 ROW
 )

If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456
When deserialize the data, the jsonNode of f0 would be [], then throws cast 
exception: Row length mismatch. 2 fields expected but was 0.

In the real scene, I set two streams:
 First, read json_table, sink to csv_table, which has the schema above.
 Then, read csv_table, do sth.

if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second streams 
failed with the exception.

If this is a bug, I want to help to fix this and unittests.

 

here is the  test code:
{code:java}
// code placeholder
val subDataType0 = ROW(
  FIELD("f0c0", STRING()),
  FIELD("f0c1", STRING())
)
val subDataType1 = ROW(
  FIELD("f1c0", INT()),
  FIELD("f1c1", INT())
)
val datatype = ROW(
  FIELD("f0", subDataType0),
  FIELD("f1", subDataType1))
val rowType = datatype.getLogicalType.asInstanceOf[RowType]

val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build()
val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new 
RowDataTypeInfo(rowType)).build()
def foo(r: RowData): Unit = {
  val serData = new String(serSchema.serialize(r))
  print(s"${serData}")
  val deserRow = deserSchema.deserialize(serData.getBytes)
  println(s"${deserRow}")
}

val normalRowData = GenericRowData.of(
  GenericRowData.of(BinaryStringData.fromString("hello"), 
BinaryStringData.fromString("world")),
  GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
)
// correct.
foo(normalRowData)

val nullRowData = GenericRowData.of(
  null,
  GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
)
/*
Exception in thread "main" java.io.IOException: Failed to deserialize CSV row 
',123;456
...
Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected 
but was 0.
 */
foo(nullRowData)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15719) Exceptions when using scala types directly with the State Process API

2020-08-27 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186212#comment-17186212
 ] 

Ying Z commented on FLINK-15719:


I pull a request here, [https://github.com/apache/flink/pull/13266]

could someone do a review, thanks.

> Exceptions when using scala types directly with the State Process API
> -
>
> Key: FLINK-15719
> URL: https://issues.apache.org/jira/browse/FLINK-15719
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.9.1
>Reporter: Ying Z
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
>
> I followed these steps to generate and read states:
>  # implements the example[1] `CountWindowAverage` in Scala(exactly same), and 
> run jobA => that makes good.
>  # execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
>  # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), 
> and run jobB => failed, exceptions shows that "Caused by: 
> org.apache.flink.util.StateMigrationException: The new key serializer must be 
> compatible."
> ReaderFunction code as below:
> {code:java}
> // code placeholder
>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
> var countState: ValueState[(Long, Long)] = _
> override def open(parameters: Configuration): Unit = {
>   val stateDescriptor = new ValueStateDescriptor("average", 
> createTypeInformation[(Long, Long)])
>   countState = getRuntimeContext().getState(stateDescriptor)
> }override def readKey(key: Long, ctx: 
> KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = {
>   out.collect(countState.value())
> }
>   }
> {code}
> 1: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state]
>  
> 2: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15719) Exceptions when using scala types directly with the State Process API

2020-08-08 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173617#comment-17173617
 ] 

Ying Z commented on FLINK-15719:


Hi, [~tzulitai] , I want to help to modify the doc [1] to make it less 
error-prone, is it ok? Here is my test code of keyed state in scala lang:
 # stateful process function to generate state
 # inputs: 1 2 3 4 5 6

 
{code:java}
// code placeholder
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
  var state: ValueState[Int] = _
  var updateTimes: ListState[Long] = _

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
val stateDescriptor = new ValueStateDescriptor("state", 
createTypeInformation[Int])
//val stateDescriptor = new ValueStateDescriptor("state", Types.INT)
state = getRuntimeContext().getState(stateDescriptor)

val updateDescriptor = new ListStateDescriptor("times", 
createTypeInformation[Long])
//val updateDescriptor = new ListStateDescriptor("times", Types.LONG)
updateTimes = getRuntimeContext().getListState(updateDescriptor)
  }

  @throws[Exception]
  override def processElement(value: Int, ctx: KeyedProcessFunction[ Int, Int, 
Void ]#Context, out: Collector[Void]): Unit = {
state.update(value + 1)
updateTimes.add(System.currentTimeMillis)
  }
}

object KeyedStateSample extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val fsStateBackend = new FsStateBackend("file:///tmp/chk_dir")
  env.setStateBackend(fsStateBackend)
  env.enableCheckpointing(6)
  
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

  env.socketTextStream("127.0.0.1", 8010)
.map(_.toInt)
.keyBy(i => i)
.process(new StatefulFunctionWithTime)
.uid("my-uid")

  env.execute()
}
{code}
 
 # read the state generated by code above, which outputs:
 # KeyedState(3,4,List(1596878053283))
KeyedState(5,6,List(1596878055023))
KeyedState(2,3,List(1596878052359))
KeyedState(4,5,List(1596878054098))
KeyedState(6,7,List(1596878056151))
KeyedState(1,2,List(1596878051332))

 
{code:java}
// code placeholder
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.state.api.Savepoint
import org.apache.flink.state.api.functions.KeyedStateReaderFunction
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._

/**
 * Description:
 */
object TestReadState extends App {
  val bEnv  = ExecutionEnvironment.getExecutionEnvironment
  val savepoint = Savepoint.load(bEnv, 
"file:///tmp/chk_dir/f988137ef1df4597bebc596ef7c76626/chk-2", new 
MemoryStateBackend)
  val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction)
  keyedState.print()

  case class KeyedState(key: Int, value: Int, times: List[Long])
  class ReaderFunction extends KeyedStateReaderFunction[java.lang.Integer, 
KeyedState] {
var state: ValueState[Int] = _
var updateTimes: ListState[Long] = _

@throws[Exception]
override def open(parameters: Configuration): Unit = {
  val stateDescriptor = new ValueStateDescriptor("state", 
createTypeInformation[Int])
  state = getRuntimeContext().getState(stateDescriptor)

  val updateDescriptor = new ListStateDescriptor("times", 
createTypeInformation[Long])
  updateTimes = getRuntimeContext().getListState(updateDescriptor)
}

override def readKey(key: java.lang.Integer,
 ctx: KeyedStateReaderFunction.Context,
 out: Collector[KeyedState]): Unit = {
  val data = KeyedState(
key,
state.value(),
updateTimes.get().asScala.toList)
  out.collect(data)
}
  }
}
{code}
 

 

1. 
https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state

> Exceptions when using scala types directly with the State Process API
> -
>
> Key: FLINK-15719
> URL: https://issues.apache.org/jira/browse/FLINK-15719
> Project: Flink
>  Issue

[jira] [Commented] (FLINK-18782) Retain the column name when converting a Table to a DataStream

2020-07-31 Thread Ying Z (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168996#comment-17168996
 ] 

Ying Z commented on FLINK-18782:


[~jark] Thanks for your advice ^_^

I have already tried the way too,and it worked.

But when I face a new scene like this:
{code:java}
// code placeholder
tableEnv.registerDataStream("source_table", sourceStream, 'a, 
'proctime.proctime)

val t1 = tableEnv.sqlQuery(
  """
|SELECT source_table.*, b FROM source_table
|, LATERAL TABLE(foo(a)) as T(b)
|""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType)
{code}
result in a new error msg:

Exception in thread "main" org.apache.flink.table.api.TableException: The time 
indicator type is an internal type only.

 

I can understand maybe  this behaivor is undefined by design, but is there a 
way to set the field name manully?I tried this way but fail again
{code:java}
// code placeholder
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
{code}
result in error msg:

Exception in thread "main" org.apache.flink.table.api.ValidationException: b is 
not a field of type Row(a: Integer, f0: Integer). Expected: a, f0}

 

> Retain the column name when converting a Table to a DataStream
> --
>
> Key: FLINK-18782
> URL: https://issues.apache.org/jira/browse/FLINK-18782
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1
>Reporter: Ying Z
>Priority: Major
>
> mail: 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-retain-the-column-name-when-convert-a-Table-to-DataStream-td37002.html]
>  
> I met some field name errors when try to convert in Table and DataStream.
>  First, init a datastream and convert to table 'source', register a 
> tablefunction named 'foo'
> {code:java}
> val sourceStream = env.socketTextStream("127.0.0.1", 8010)
>   .map(line => line.toInt)
> tableEnv.registerDataStream("source_table", sourceStream, 'a)
> class Foo() extends TableFunction[(Int)] {
>   def eval(col: Int): Unit = collect((col * 10))
> }
> tableEnv.registerFunction("foo", new Foo)
> {code}
> Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
> {code:java}
> val t1 = tableEnv.sqlQuery(
>   """
> |SELECT source_table.a, b FROM source_table
> |, LATERAL TABLE(foo(a)) as T(b)
> |""".stripMargin
> )
> /*
>  t1 table schema: root
>  |-- a: INT
>  |-- b: INT
>  */
> println(s"t1 table schema: ${t1.getSchema}")
> {code}
> When I try to convert 't1' to a datastream then register to a new table(for 
> some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
> {code:java}
> val t1Stream = t1.toAppendStream[Row]
> // t1 stream schema: Row(a: Integer, f0: Integer)
> println(s"t1 stream schema: ${t1Stream.getType()}")
> tableEnv.registerDataStream("t1", t1Stream)
> /*
> new t1 table schema: root
> |-- a: INT
> |-- f0: INT
>  */
> println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18782) How to retain the column'name when convert a Table to DataStream

2020-07-31 Thread Ying Z (Jira)
Ying Z created FLINK-18782:
--

 Summary: How to retain the column'name when convert a Table to 
DataStream
 Key: FLINK-18782
 URL: https://issues.apache.org/jira/browse/FLINK-18782
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.1
Reporter: Ying Z


mail: 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-retain-the-column-name-when-convert-a-Table-to-DataStream-td37002.html]

 

I met some field name errors when try to convert in Table and DataStream.
 First, init a datastream and convert to table 'source', register a 
tablefunction named 'foo'
{code:java}
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
  .map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
  def eval(col: Int): Unit = collect((col * 10))
}
tableEnv.registerFunction("foo", new Foo)
{code}
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
{code:java}
val t1 = tableEnv.sqlQuery(
  """
|SELECT source_table.a, b FROM source_table
|, LATERAL TABLE(foo(a)) as T(b)
|""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
{code}
When I try to convert 't1' to a datastream then register to a new table(for 
some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
{code:java}
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
 */
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15719) Exceptions when using scala types directly with the State Process API

2020-01-21 Thread Ying Z (Jira)
Ying Z created FLINK-15719:
--

 Summary: Exceptions when using scala types directly with the State 
Process API
 Key: FLINK-15719
 URL: https://issues.apache.org/jira/browse/FLINK-15719
 Project: Flink
  Issue Type: Bug
  Components: API / State Processor
Affects Versions: 1.9.1
Reporter: Ying Z


I followed these steps to generate and read states:
 # implements the example[1] `CountWindowAverage` in Scala(exactly same), and 
run jobA => that makes good.
 # execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
 # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), 
and run jobB => failed, exceptions shows that "Caused by: 
org.apache.flink.util.StateMigrationException: The new key serializer must be 
compatible."

ReaderFunction code as below:
{code:java}
// code placeholder
  class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
var countState: ValueState[(Long, Long)] = _
override def open(parameters: Configuration): Unit = {
  val stateDescriptor = new ValueStateDescriptor("average", 
createTypeInformation[(Long, Long)])
  countState = getRuntimeContext().getState(stateDescriptor)
}override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, 
out: Collector[(Long, Long)]): Unit = {
  out.collect(countState.value())
}
  }
{code}
1: 
[https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state]
 

2: 
[https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state]
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)