[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services
[ https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851859#comment-17851859 ] Ying Z commented on FLINK-35502: Yes, I understand that using cron + savepoint can partially solve the problem. However, we need to recover the tasks from any point in the past three days, and maybe increasing the frequency of cron is also feasible. But I don't understand why we can't just use checkpointing to automatically achieve this? > compress the checkpoint metadata generated by ZK/ETCD HA Services > - > > Key: FLINK-35502 > URL: https://issues.apache.org/jira/browse/FLINK-35502 > Project: Flink > Issue Type: Improvement >Reporter: Ying Z >Priority: Major > > In the implementation of Flink HA, the metadata of checkpoints is stored in > either Zookeeper (ZK HA) or ETCD (K8S HA), such as: > {code:java} > checkpointID-0036044: > checkpointID-0036045: > ... > ... {code} > However, neither of these are designed to store excessive amounts of data. If > the > [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained]) > setting is set too large, it can easily cause abnormalities in ZK/ETCD. > The error log when set state.checkpoints.num-retained to 1500: > {code:java} > Caused by: org.apache.flink.util.SerializedThrowable: > io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: > PUT at: > https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. > Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: > Too long: must have at most 1048576 bytes. Received status: > Status(apiVersion=v1, code=422, > details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must > have at most 1048576 bytes, reason=FieldValueTooLong, > additionalProperties={})l, group=null, kind=ConfigMap, > name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, > additionalProperties=(}), kind=Status, message=ConfigMap > "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 > bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, > resourceVersion=null, selfLink=null, additionalProperties={}), > reason=Invalid, status=Failure, additionalProperties=(}). {code} > In Flink's code, all checkpoint metadata are updated at the same time, and > The checkpoint metadata contains many repeated bytes, therefore it can > achieve a very good compression ratio. > Therefore, I suggest compressing the data when writing checkpoints and > decompressing it when reading, to reduce storage pressure and improve IO > efficiency. > Here is the sample code, and reduce the metadata size from 1M bytes to 30K. > {code:java} > // Map -> Json > ObjectMapper objectMapper = new ObjectMapper(); > String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // > copress and base64 > String compressedBase64 = compressAndEncode(checkpointJson); > compressedData.put("checkpoint-all", compressedBase64);{code} > {code:java} > private static String compressAndEncode(String data) throws IOException { > ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); > try (GZIPOutputStream gzipOutputStream = new > GZIPOutputStream(outputStream)) > { gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); > } > byte[] compressedData = outputStream.toByteArray(); > return Base64.getEncoder().encodeToString(compressedData); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services
[ https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851346#comment-17851346 ] Ying Z commented on FLINK-35502: [~roman] My usage scenario is as follows: The checkpoints of the task are stored in $state.checkpoints.dir, and the number of them is determined by $state.checkpoints.num-retained. For example, if checkpoint interval is 5 minutes, and you want to recover from a checkpoint that was made 3 days ago when restarting the task, you would need to save 60/5*24*3 = 864 checkpoints. If the checkpoint frequency is higher, or if you need to recover from an earlier time, state.checkpoints.num-retained will need to be set larger. > compress the checkpoint metadata generated by ZK/ETCD HA Services > - > > Key: FLINK-35502 > URL: https://issues.apache.org/jira/browse/FLINK-35502 > Project: Flink > Issue Type: Improvement >Reporter: Ying Z >Priority: Major > > In the implementation of Flink HA, the metadata of checkpoints is stored in > either Zookeeper (ZK HA) or ETCD (K8S HA), such as: > {code:java} > checkpointID-0036044: > checkpointID-0036045: > ... > ... {code} > However, neither of these are designed to store excessive amounts of data. If > the > [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained]) > setting is set too large, it can easily cause abnormalities in ZK/ETCD. > The error log when set state.checkpoints.num-retained to 1500: > {code:java} > Caused by: org.apache.flink.util.SerializedThrowable: > io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: > PUT at: > https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. > Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: > Too long: must have at most 1048576 bytes. Received status: > Status(apiVersion=v1, code=422, > details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must > have at most 1048576 bytes, reason=FieldValueTooLong, > additionalProperties={})l, group=null, kind=ConfigMap, > name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, > additionalProperties=(}), kind=Status, message=ConfigMap > "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 > bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, > resourceVersion=null, selfLink=null, additionalProperties={}), > reason=Invalid, status=Failure, additionalProperties=(}). {code} > In Flink's code, all checkpoint metadata are updated at the same time, and > The checkpoint metadata contains many repeated bytes, therefore it can > achieve a very good compression ratio. > Therefore, I suggest compressing the data when writing checkpoints and > decompressing it when reading, to reduce storage pressure and improve IO > efficiency. > Here is the sample code, and reduce the metadata size from 1M bytes to 30K. > {code:java} > // Map -> Json > ObjectMapper objectMapper = new ObjectMapper(); > String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // > copress and base64 > String compressedBase64 = compressAndEncode(checkpointJson); > compressedData.put("checkpoint-all", compressedBase64);{code} > {code:java} > private static String compressAndEncode(String data) throws IOException { > ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); > try (GZIPOutputStream gzipOutputStream = new > GZIPOutputStream(outputStream)) > { gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); > } > byte[] compressedData = outputStream.toByteArray(); > return Base64.getEncoder().encodeToString(compressedData); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services
[ https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Z updated FLINK-35502: --- Description: In the implementation of Flink HA, the metadata of checkpoints is stored in either Zookeeper (ZK HA) or ETCD (K8S HA), such as: {code:java} checkpointID-0036044: checkpointID-0036045: ... ... {code} However, neither of these are designed to store excessive amounts of data. If the [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained]) setting is set too large, it can easily cause abnormalities in ZK/ETCD. The error log when set state.checkpoints.num-retained to 1500: {code:java} Caused by: org.apache.flink.util.SerializedThrowable: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: Too long: must have at most 1048576 bytes. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, additionalProperties=(}), kind=Status, message=ConfigMap "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties=(}). {code} In Flink's code, all checkpoint metadata are updated at the same time, and The checkpoint metadata contains many repeated bytes, therefore it can achieve a very good compression ratio. Therefore, I suggest compressing the data when writing checkpoints and decompressing it when reading, to reduce storage pressure and improve IO efficiency. Here is the sample code, and reduce the metadata size from 1M bytes to 30K. {code:java} // Map -> Json ObjectMapper objectMapper = new ObjectMapper(); String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // copress and base64 String compressedBase64 = compressAndEncode(checkpointJson); compressedData.put("checkpoint-all", compressedBase64);{code} {code:java} private static String compressAndEncode(String data) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(outputStream)) { gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); } byte[] compressedData = outputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedData); } {code} was: In the implementation of Flink HA, the metadata of checkpoints is stored in either Zookeeper (ZK HA) or ETCD (K8S HA), such as: {code:java} checkpointID-0036044: checkpointID-0036045: ... ... {code} However, neither of these are designed to store excessive amounts of data. If the [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained]) setting is set too large, it can easily cause abnormalities in ZK/ETCD. The error log when set state.checkpoints.num-retained to 1500: {code:java} Caused by: org.apache.flink.util.SerializedThrowable: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: Too long: must have at most 1048576 bytes. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, additionalProperties=(}), kind=Status, message=ConfigMap "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties=(}). {code} In Flink's code, all checkpoint metadata are updated at the same time, and The checkpoint metadata contains many repeated bytes, therefore it can achieve a very good compression ratio. Therefore, I suggest compressing the data when writing checkpoints and decompressing it when reading, to reduce storage pressure and improve IO efficiency. Here is the sample code, and reduce the metadata size from 1M bytes to 30K. {code:java} //
[jira] [Updated] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services
[ https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Z updated FLINK-35502: --- Description: In the implementation of Flink HA, the metadata of checkpoints is stored in either Zookeeper (ZK HA) or ETCD (K8S HA), such as: {code:java} checkpointID-0036044: checkpointID-0036045: ... ... {code} However, neither of these are designed to store excessive amounts of data. If the [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained]) setting is set too large, it can easily cause abnormalities in ZK/ETCD. The error log when set state.checkpoints.num-retained to 1500: {code:java} Caused by: org.apache.flink.util.SerializedThrowable: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: Too long: must have at most 1048576 bytes. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, additionalProperties=(}), kind=Status, message=ConfigMap "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties=(}). {code} In Flink's code, all checkpoint metadata are updated at the same time, and The checkpoint metadata contains many repeated bytes, therefore it can achieve a very good compression ratio. Therefore, I suggest compressing the data when writing checkpoints and decompressing it when reading, to reduce storage pressure and improve IO efficiency. Here is the sample code, and reduce the metadata size from 1M bytes to 30K. {code:java} // Map -> Json ObjectMapper objectMapper = new ObjectMapper(); String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // copress and base64 String compressedBase64 = compressAndEncode(checkpointJson); compressedData.put("checkpoint-all", compressedBase64); private static String compressAndEncode(String data) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(outputStream)) { gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); } byte[] compressedData = outputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedData); } {code} was: In the implementation of Flink HA, the metadata of checkpoints is stored in either Zookeeper (ZK HA) or ETCD (K8S HA), such as: ``` checkpointID-0036044: checkpointID-0036045: ... ... ``` However, neither of these are designed to store excessive amounts of data. If the [state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained) setting is set too large, it can easily cause abnormalities in ZK/ETCD. The error log when set state.checkpoints.num-retained to 1500: ``` Caused by: org.apache.flink.util.SerializedThrowable: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: Too long: must have at most 1048576 bytes. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, additionalProperties=(}), kind=Status, message=ConfigMap "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties=(}). ``` In Flink's code, all checkpoint metadata are updated at the same time, and The checkpoint metadata contains many repeated bytes, therefore it can achieve a very good compression ratio. Therefore, I suggest compressing the data when writing checkpoints and decompressing it when reading, to reduce storage pressure and improve IO efficiency. Here is the sample code, and reduce the metadata size
[jira] [Updated] (FLINK-35502) compress the checkpoint metadata generated by ZK/ETCD HA Services
[ https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Z updated FLINK-35502: --- Summary: compress the checkpoint metadata generated by ZK/ETCD HA Services (was: compress the checkpoint metadata ZK/ETCD HA Services) > compress the checkpoint metadata generated by ZK/ETCD HA Services > - > > Key: FLINK-35502 > URL: https://issues.apache.org/jira/browse/FLINK-35502 > Project: Flink > Issue Type: Improvement >Reporter: Ying Z >Priority: Major > > In the implementation of Flink HA, the metadata of checkpoints is stored in > either Zookeeper (ZK HA) or ETCD (K8S HA), such as: > ``` > checkpointID-0036044: > checkpointID-0036045: > ... > ... > ``` > However, neither of these are designed to store excessive amounts of data. If > the > [state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained) > setting is set too large, it can easily cause abnormalities in ZK/ETCD. > The error log when set state.checkpoints.num-retained to 1500: > ``` > Caused by: org.apache.flink.util.SerializedThrowable: > io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: > PUT at: > https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. > Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: > Too long: must have at most 1048576 bytes. Received status: > Status(apiVersion=v1, code=422, > details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must > have at most 1048576 bytes, reason=FieldValueTooLong, > additionalProperties={})l, group=null, kind=ConfigMap, > name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, > additionalProperties=(}), kind=Status, message=ConfigMap > "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 > bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, > resourceVersion=null, selfLink=null, additionalProperties={}), > reason=Invalid, status=Failure, additionalProperties=(}). > ``` > In Flink's code, all checkpoint metadata are updated at the same time, and > The checkpoint metadata contains many repeated bytes, therefore it can > achieve a very good compression ratio. > Therefore, I suggest compressing the data when writing checkpoints and > decompressing it when reading, to reduce storage pressure and improve IO > efficiency. > Here is the sample code, and reduce the metadata size from 1M bytes to 30K. > ```java > // Map -> Json > ObjectMapper objectMapper = new ObjectMapper(); > String checkpointJson = > objectMapper.writeValueAsString(checkpointMap); > // copress and base64 > String compressedBase64 = compressAndEncode(checkpointJson); > compressedData.put("checkpoint-all", compressedBase64); > private static String compressAndEncode(String data) throws IOException { > ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); > try (GZIPOutputStream gzipOutputStream = new > GZIPOutputStream(outputStream)) { > gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); > } > byte[] compressedData = outputStream.toByteArray(); > return Base64.getEncoder().encodeToString(compressedData); > } > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35502) compress the checkpoint metadata ZK/ETCD HA Services
Ying Z created FLINK-35502: -- Summary: compress the checkpoint metadata ZK/ETCD HA Services Key: FLINK-35502 URL: https://issues.apache.org/jira/browse/FLINK-35502 Project: Flink Issue Type: Improvement Reporter: Ying Z In the implementation of Flink HA, the metadata of checkpoints is stored in either Zookeeper (ZK HA) or ETCD (K8S HA), such as: ``` checkpointID-0036044: checkpointID-0036045: ... ... ``` However, neither of these are designed to store excessive amounts of data. If the [state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained) setting is set too large, it can easily cause abnormalities in ZK/ETCD. The error log when set state.checkpoints.num-retained to 1500: ``` Caused by: org.apache.flink.util.SerializedThrowable: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: Too long: must have at most 1048576 bytes. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, additionalProperties=(}), kind=Status, message=ConfigMap "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties=(}). ``` In Flink's code, all checkpoint metadata are updated at the same time, and The checkpoint metadata contains many repeated bytes, therefore it can achieve a very good compression ratio. Therefore, I suggest compressing the data when writing checkpoints and decompressing it when reading, to reduce storage pressure and improve IO efficiency. Here is the sample code, and reduce the metadata size from 1M bytes to 30K. ```java // Map -> Json ObjectMapper objectMapper = new ObjectMapper(); String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // copress and base64 String compressedBase64 = compressAndEncode(checkpointJson); compressedData.put("checkpoint-all", compressedBase64); private static String compressAndEncode(String data) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(outputStream)) { gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); } byte[] compressedData = outputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedData); } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19298) Maven enforce goal dependency-convergence failed on flink-json
[ https://issues.apache.org/jira/browse/FLINK-19298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17200745#comment-17200745 ] Ying Z commented on FLINK-19298: For com.google.guava:guava:19.0 and 16.0.1, I think we can simply add a tag to the pom.xml[1]. Convergence of janino makes me a little confused: 1. now we use calcite 1.22.0、janino 3.0.9 [2], and the comments remind us {color:#DE350B}{color} 2. in fact calcite-core 1.22.0 depends on janino 3.0.11 [3] So the way to address this issue is to set janino.version to 3.0.11, or 3.0.9 is setted for historical reasons? 1. https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/pom.xml 2. https://github.com/apache/flink/blob/master/flink-table/pom.xml 3. https://mvnrepository.com/artifact/org.apache.calcite/calcite-core/1.22.0 > Maven enforce goal dependency-convergence failed on flink-json > -- > > Key: FLINK-19298 > URL: https://issues.apache.org/jira/browse/FLINK-19298 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Critical > > See more > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6669&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=9b1a0f88-517b-5893-fc93-76f4670982b4 > {code} > 2020-09-20T17:08:16.0930669Z 17:08:16.092 [INFO] --- > maven-enforcer-plugin:3.0.0-M1:enforce (dependency-convergence) @ flink-json > --- > 2020-09-20T17:08:16.1089006Z 17:08:16.103 [WARNING] > 2020-09-20T17:08:16.1089561Z Dependency convergence error for > com.google.guava:guava:19.0 paths to dependency are: > 2020-09-20T17:08:16.1090432Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1091072Z > +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1091670Z +-com.google.guava:guava:19.0 > 2020-09-20T17:08:16.1092014Z and > 2020-09-20T17:08:16.1092496Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1093322Z > +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1093926Z +-org.apache.calcite:calcite-core:1.22.0 > 2020-09-20T17:08:16.1094521Z +-org.apache.calcite:calcite-linq4j:1.22.0 > 2020-09-20T17:08:16.1095076Z +-com.google.guava:guava:19.0 > 2020-09-20T17:08:16.1095441Z and > 2020-09-20T17:08:16.1095927Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1096726Z > +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1097419Z +-org.apache.calcite:calcite-core:1.22.0 > 2020-09-20T17:08:16.1098042Z +-com.google.guava:guava:19.0 > 2020-09-20T17:08:16.1098435Z and > 2020-09-20T17:08:16.1098984Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1099700Z > +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1100359Z +-com.google.guava:guava:19.0 > 2020-09-20T17:08:16.1100749Z and > 2020-09-20T17:08:16.1101293Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1128892Z > +-org.apache.flink:flink-test-utils_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1129766Z +-org.apache.curator:curator-test:2.12.0 > 2020-09-20T17:08:16.1130466Z +-com.google.guava:guava:16.0.1 > 2020-09-20T17:08:16.1130843Z > 2020-09-20T17:08:16.1131224Z 17:08:16.109 [WARNING] > 2020-09-20T17:08:16.1132069Z Dependency convergence error for > org.codehaus.janino:commons-compiler:3.0.9 paths to dependency are: > 2020-09-20T17:08:16.1133127Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1133906Z > +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1134663Z +-org.codehaus.janino:commons-compiler:3.0.9 > 2020-09-20T17:08:16.1135224Z and > 2020-09-20T17:08:16.1135772Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1136487Z > +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1137150Z +-org.codehaus.janino:janino:3.0.9 > 2020-09-20T17:08:16.1137825Z > +-org.codehaus.janino:commons-compiler:3.0.9 > 2020-09-20T17:08:16.1138250Z and > 2020-09-20T17:08:16.1138798Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1139514Z > +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1141028Z +-org.apache.calcite:calcite-core:1.22.0 > 2020-09-20T17:08:16.1141782Z > +-org.codehaus.janino:commons-compiler:3.0.11 > 2020-09-20T17:08:16.1142140Z and > 2020-09-20T17:08:16.1142635Z +-org.apache.flink:flink-json:1.12-SNAPSHOT > 2020-09-20T17:08:16.1143270Z > +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT > 2020-09-20T17:08:16.1143913Z +-org.codehaus.ja
[jira] [Commented] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
[ https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198768#comment-17198768 ] Ying Z commented on FLINK-19244: [~jark] I try to pull a request, and cloud you help to assign this ticket? Thanks. > CsvRowDataDeserializationSchema throws cast exception : Row length mismatch. > > > Key: FLINK-19244 > URL: https://issues.apache.org/jira/browse/FLINK-19244 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.0 >Reporter: Ying Z >Priority: Major > Labels: pull-request-available > > CREATE TABLE csv_table ( > f0 ROW, > f1 ROW > ) > If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456 > When deserialize the data, the jsonNode of f0 would be [], then throws cast > exception: Row length mismatch. 2 fields expected but was 0. > In the real scene, I set two streams: > First, read json_table, sink to csv_table, which has the schema above. > Then, read csv_table, do sth. > if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second > streams failed with the exception. > If this is a bug, I want to help to fix this and unittests. > > here is the test code: > {code:java} > // code placeholder > val subDataType0 = ROW( > FIELD("f0c0", STRING()), > FIELD("f0c1", STRING()) > ) > val subDataType1 = ROW( > FIELD("f1c0", INT()), > FIELD("f1c1", INT()) > ) > val datatype = ROW( > FIELD("f0", subDataType0), > FIELD("f1", subDataType1)) > val rowType = datatype.getLogicalType.asInstanceOf[RowType] > val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build() > val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new > RowDataTypeInfo(rowType)).build() > def foo(r: RowData): Unit = { > val serData = new String(serSchema.serialize(r)) > print(s"${serData}") > val deserRow = deserSchema.deserialize(serData.getBytes) > println(s"${deserRow}") > } > val normalRowData = GenericRowData.of( > GenericRowData.of(BinaryStringData.fromString("hello"), > BinaryStringData.fromString("world")), > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > // correct. > foo(normalRowData) > val nullRowData = GenericRowData.of( > null, > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > /* > Exception in thread "main" java.io.IOException: Failed to deserialize CSV row > ',123;456 > ... > Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected > but was 0. > */ > foo(nullRowData) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
[ https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196668#comment-17196668 ] Ying Z commented on FLINK-19244: [~jark] Thanks for your advice, this will lead the user to use format like json/arvo/protobuf for nested types too:) > CsvRowDataDeserializationSchema throws cast exception : Row length mismatch. > > > Key: FLINK-19244 > URL: https://issues.apache.org/jira/browse/FLINK-19244 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.0 >Reporter: Ying Z >Priority: Major > > CREATE TABLE csv_table ( > f0 ROW, > f1 ROW > ) > If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456 > When deserialize the data, the jsonNode of f0 would be [], then throws cast > exception: Row length mismatch. 2 fields expected but was 0. > In the real scene, I set two streams: > First, read json_table, sink to csv_table, which has the schema above. > Then, read csv_table, do sth. > if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second > streams failed with the exception. > If this is a bug, I want to help to fix this and unittests. > > here is the test code: > {code:java} > // code placeholder > val subDataType0 = ROW( > FIELD("f0c0", STRING()), > FIELD("f0c1", STRING()) > ) > val subDataType1 = ROW( > FIELD("f1c0", INT()), > FIELD("f1c1", INT()) > ) > val datatype = ROW( > FIELD("f0", subDataType0), > FIELD("f1", subDataType1)) > val rowType = datatype.getLogicalType.asInstanceOf[RowType] > val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build() > val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new > RowDataTypeInfo(rowType)).build() > def foo(r: RowData): Unit = { > val serData = new String(serSchema.serialize(r)) > print(s"${serData}") > val deserRow = deserSchema.deserialize(serData.getBytes) > println(s"${deserRow}") > } > val normalRowData = GenericRowData.of( > GenericRowData.of(BinaryStringData.fromString("hello"), > BinaryStringData.fromString("world")), > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > // correct. > foo(normalRowData) > val nullRowData = GenericRowData.of( > null, > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > /* > Exception in thread "main" java.io.IOException: Failed to deserialize CSV row > ',123;456 > ... > Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected > but was 0. > */ > foo(nullRowData) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
[ https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196636#comment-17196636 ] Ying Z commented on FLINK-19244: [~jark] ignoreParseErrors=true will skip this record x = serialize(y) means y = deserialize(x), so is deserialize(',123;456') = originalRowData maybe better? I totally agree with 'CSV is not a good format to handle nested types'. I apply this to compat with the legacy code, which uses csv format for less space and more visible. > CsvRowDataDeserializationSchema throws cast exception : Row length mismatch. > > > Key: FLINK-19244 > URL: https://issues.apache.org/jira/browse/FLINK-19244 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.0 >Reporter: Ying Z >Priority: Major > > CREATE TABLE csv_table ( > f0 ROW, > f1 ROW > ) > If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456 > When deserialize the data, the jsonNode of f0 would be [], then throws cast > exception: Row length mismatch. 2 fields expected but was 0. > In the real scene, I set two streams: > First, read json_table, sink to csv_table, which has the schema above. > Then, read csv_table, do sth. > if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second > streams failed with the exception. > If this is a bug, I want to help to fix this and unittests. > > here is the test code: > {code:java} > // code placeholder > val subDataType0 = ROW( > FIELD("f0c0", STRING()), > FIELD("f0c1", STRING()) > ) > val subDataType1 = ROW( > FIELD("f1c0", INT()), > FIELD("f1c1", INT()) > ) > val datatype = ROW( > FIELD("f0", subDataType0), > FIELD("f1", subDataType1)) > val rowType = datatype.getLogicalType.asInstanceOf[RowType] > val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build() > val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new > RowDataTypeInfo(rowType)).build() > def foo(r: RowData): Unit = { > val serData = new String(serSchema.serialize(r)) > print(s"${serData}") > val deserRow = deserSchema.deserialize(serData.getBytes) > println(s"${deserRow}") > } > val normalRowData = GenericRowData.of( > GenericRowData.of(BinaryStringData.fromString("hello"), > BinaryStringData.fromString("world")), > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > // correct. > foo(normalRowData) > val nullRowData = GenericRowData.of( > null, > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > /* > Exception in thread "main" java.io.IOException: Failed to deserialize CSV row > ',123;456 > ... > Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected > but was 0. > */ > foo(nullRowData) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
[ https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196636#comment-17196636 ] Ying Z edited comment on FLINK-19244 at 9/16/20, 2:59 AM: -- [~jark] ignoreParseErrors=true will skip this record x = serialize( y) means y = deserialize( x), so is deserialize(',123;456') = originalRowData maybe better? I totally agree with 'CSV is not a good format to handle nested types'. I apply this to compat with the legacy code, which uses csv format for less space and more visible. was (Author: yingz): [~jark] ignoreParseErrors=true will skip this record x = serialize(y) means y = deserialize(x), so is deserialize(',123;456') = originalRowData maybe better? I totally agree with 'CSV is not a good format to handle nested types'. I apply this to compat with the legacy code, which uses csv format for less space and more visible. > CsvRowDataDeserializationSchema throws cast exception : Row length mismatch. > > > Key: FLINK-19244 > URL: https://issues.apache.org/jira/browse/FLINK-19244 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.0 >Reporter: Ying Z >Priority: Major > > CREATE TABLE csv_table ( > f0 ROW, > f1 ROW > ) > If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456 > When deserialize the data, the jsonNode of f0 would be [], then throws cast > exception: Row length mismatch. 2 fields expected but was 0. > In the real scene, I set two streams: > First, read json_table, sink to csv_table, which has the schema above. > Then, read csv_table, do sth. > if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second > streams failed with the exception. > If this is a bug, I want to help to fix this and unittests. > > here is the test code: > {code:java} > // code placeholder > val subDataType0 = ROW( > FIELD("f0c0", STRING()), > FIELD("f0c1", STRING()) > ) > val subDataType1 = ROW( > FIELD("f1c0", INT()), > FIELD("f1c1", INT()) > ) > val datatype = ROW( > FIELD("f0", subDataType0), > FIELD("f1", subDataType1)) > val rowType = datatype.getLogicalType.asInstanceOf[RowType] > val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build() > val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new > RowDataTypeInfo(rowType)).build() > def foo(r: RowData): Unit = { > val serData = new String(serSchema.serialize(r)) > print(s"${serData}") > val deserRow = deserSchema.deserialize(serData.getBytes) > println(s"${deserRow}") > } > val normalRowData = GenericRowData.of( > GenericRowData.of(BinaryStringData.fromString("hello"), > BinaryStringData.fromString("world")), > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > // correct. > foo(normalRowData) > val nullRowData = GenericRowData.of( > null, > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > /* > Exception in thread "main" java.io.IOException: Failed to deserialize CSV row > ',123;456 > ... > Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected > but was 0. > */ > foo(nullRowData) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
Ying Z created FLINK-19244: -- Summary: CsvRowDataDeserializationSchema throws cast exception : Row length mismatch. Key: FLINK-19244 URL: https://issues.apache.org/jira/browse/FLINK-19244 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.11.0 Reporter: Ying Z CREATE TABLE csv_table ( f0 ROW, f1 ROW ) If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456 When deserialize the data, the jsonNode of f0 would be [], then throws cast exception: Row length mismatch. 2 fields expected but was 0. In the real scene, I set two streams: First, read json_table, sink to csv_table, which has the schema above. Then, read csv_table, do sth. if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second streams failed with the exception. If this is a bug, I want to help to fix this and unittests. here is the test code: {code:java} // code placeholder val subDataType0 = ROW( FIELD("f0c0", STRING()), FIELD("f0c1", STRING()) ) val subDataType1 = ROW( FIELD("f1c0", INT()), FIELD("f1c1", INT()) ) val datatype = ROW( FIELD("f0", subDataType0), FIELD("f1", subDataType1)) val rowType = datatype.getLogicalType.asInstanceOf[RowType] val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build() val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new RowDataTypeInfo(rowType)).build() def foo(r: RowData): Unit = { val serData = new String(serSchema.serialize(r)) print(s"${serData}") val deserRow = deserSchema.deserialize(serData.getBytes) println(s"${deserRow}") } val normalRowData = GenericRowData.of( GenericRowData.of(BinaryStringData.fromString("hello"), BinaryStringData.fromString("world")), GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) ) // correct. foo(normalRowData) val nullRowData = GenericRowData.of( null, GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) ) /* Exception in thread "main" java.io.IOException: Failed to deserialize CSV row ',123;456 ... Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected but was 0. */ foo(nullRowData) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15719) Exceptions when using scala types directly with the State Process API
[ https://issues.apache.org/jira/browse/FLINK-15719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186212#comment-17186212 ] Ying Z commented on FLINK-15719: I pull a request here, [https://github.com/apache/flink/pull/13266] could someone do a review, thanks. > Exceptions when using scala types directly with the State Process API > - > > Key: FLINK-15719 > URL: https://issues.apache.org/jira/browse/FLINK-15719 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: 1.9.1 >Reporter: Ying Z >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > > I followed these steps to generate and read states: > # implements the example[1] `CountWindowAverage` in Scala(exactly same), and > run jobA => that makes good. > # execute `flink cancel -s ${JobID}` => savepoints was generated as expected. > # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), > and run jobB => failed, exceptions shows that "Caused by: > org.apache.flink.util.StateMigrationException: The new key serializer must be > compatible." > ReaderFunction code as below: > {code:java} > // code placeholder > class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] { > var countState: ValueState[(Long, Long)] = _ > override def open(parameters: Configuration): Unit = { > val stateDescriptor = new ValueStateDescriptor("average", > createTypeInformation[(Long, Long)]) > countState = getRuntimeContext().getState(stateDescriptor) > }override def readKey(key: Long, ctx: > KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = { > out.collect(countState.value()) > } > } > {code} > 1: > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state] > > 2: > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15719) Exceptions when using scala types directly with the State Process API
[ https://issues.apache.org/jira/browse/FLINK-15719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173617#comment-17173617 ] Ying Z commented on FLINK-15719: Hi, [~tzulitai] , I want to help to modify the doc [1] to make it less error-prone, is it ok? Here is my test code of keyed state in scala lang: # stateful process function to generate state # inputs: 1 2 3 4 5 6 {code:java} // code placeholder import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] { var state: ValueState[Int] = _ var updateTimes: ListState[Long] = _ @throws[Exception] override def open(parameters: Configuration): Unit = { val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int]) //val stateDescriptor = new ValueStateDescriptor("state", Types.INT) state = getRuntimeContext().getState(stateDescriptor) val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long]) //val updateDescriptor = new ListStateDescriptor("times", Types.LONG) updateTimes = getRuntimeContext().getListState(updateDescriptor) } @throws[Exception] override def processElement(value: Int, ctx: KeyedProcessFunction[ Int, Int, Void ]#Context, out: Collector[Void]): Unit = { state.update(value + 1) updateTimes.add(System.currentTimeMillis) } } object KeyedStateSample extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment val fsStateBackend = new FsStateBackend("file:///tmp/chk_dir") env.setStateBackend(fsStateBackend) env.enableCheckpointing(6) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.socketTextStream("127.0.0.1", 8010) .map(_.toInt) .keyBy(i => i) .process(new StatefulFunctionWithTime) .uid("my-uid") env.execute() } {code} # read the state generated by code above, which outputs: # KeyedState(3,4,List(1596878053283)) KeyedState(5,6,List(1596878055023)) KeyedState(2,3,List(1596878052359)) KeyedState(4,5,List(1596878054098)) KeyedState(6,7,List(1596878056151)) KeyedState(1,2,List(1596878051332)) {code:java} // code placeholder import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.state.api.Savepoint import org.apache.flink.state.api.functions.KeyedStateReaderFunction import org.apache.flink.util.Collector import scala.collection.JavaConverters._ /** * Description: */ object TestReadState extends App { val bEnv = ExecutionEnvironment.getExecutionEnvironment val savepoint = Savepoint.load(bEnv, "file:///tmp/chk_dir/f988137ef1df4597bebc596ef7c76626/chk-2", new MemoryStateBackend) val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction) keyedState.print() case class KeyedState(key: Int, value: Int, times: List[Long]) class ReaderFunction extends KeyedStateReaderFunction[java.lang.Integer, KeyedState] { var state: ValueState[Int] = _ var updateTimes: ListState[Long] = _ @throws[Exception] override def open(parameters: Configuration): Unit = { val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int]) state = getRuntimeContext().getState(stateDescriptor) val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long]) updateTimes = getRuntimeContext().getListState(updateDescriptor) } override def readKey(key: java.lang.Integer, ctx: KeyedStateReaderFunction.Context, out: Collector[KeyedState]): Unit = { val data = KeyedState( key, state.value(), updateTimes.get().asScala.toList) out.collect(data) } } } {code} 1. https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state > Exceptions when using scala types directly with the State Process API > - > > Key: FLINK-15719 > URL: https://issues.apache.org/jira/browse/FLINK-15719 > Project: Flink > Issue
[jira] [Commented] (FLINK-18782) Retain the column name when converting a Table to a DataStream
[ https://issues.apache.org/jira/browse/FLINK-18782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168996#comment-17168996 ] Ying Z commented on FLINK-18782: [~jark] Thanks for your advice ^_^ I have already tried the way too,and it worked. But when I face a new scene like this: {code:java} // code placeholder tableEnv.registerDataStream("source_table", sourceStream, 'a, 'proctime.proctime) val t1 = tableEnv.sqlQuery( """ |SELECT source_table.*, b FROM source_table |, LATERAL TABLE(foo(a)) as T(b) |""".stripMargin ) /* t1 table schema: root |-- a: INT |-- b: INT */ println(s"t1 table schema: ${t1.getSchema}") val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType) {code} result in a new error msg: Exception in thread "main" org.apache.flink.table.api.TableException: The time indicator type is an internal type only. I can understand maybe this behaivor is undefined by design, but is there a way to set the field name manully?I tried this way but fail again {code:java} // code placeholder val t1Stream = t1.toAppendStream[Row] // t1 stream schema: Row(a: Integer, f0: Integer) println(s"t1 stream schema: ${t1Stream.getType()}") tableEnv.registerDataStream("t1", t1Stream, 'a, 'b) {code} result in error msg: Exception in thread "main" org.apache.flink.table.api.ValidationException: b is not a field of type Row(a: Integer, f0: Integer). Expected: a, f0} > Retain the column name when converting a Table to a DataStream > -- > > Key: FLINK-18782 > URL: https://issues.apache.org/jira/browse/FLINK-18782 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.9.1 >Reporter: Ying Z >Priority: Major > > mail: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-retain-the-column-name-when-convert-a-Table-to-DataStream-td37002.html] > > I met some field name errors when try to convert in Table and DataStream. > First, init a datastream and convert to table 'source', register a > tablefunction named 'foo' > {code:java} > val sourceStream = env.socketTextStream("127.0.0.1", 8010) > .map(line => line.toInt) > tableEnv.registerDataStream("source_table", sourceStream, 'a) > class Foo() extends TableFunction[(Int)] { > def eval(col: Int): Unit = collect((col * 10)) > } > tableEnv.registerFunction("foo", new Foo) > {code} > Then, use sqlQuery to generate a new table t1 with columns 'a' 'b' > {code:java} > val t1 = tableEnv.sqlQuery( > """ > |SELECT source_table.a, b FROM source_table > |, LATERAL TABLE(foo(a)) as T(b) > |""".stripMargin > ) > /* > t1 table schema: root > |-- a: INT > |-- b: INT > */ > println(s"t1 table schema: ${t1.getSchema}") > {code} > When I try to convert 't1' to a datastream then register to a new table(for > some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b' > {code:java} > val t1Stream = t1.toAppendStream[Row] > // t1 stream schema: Row(a: Integer, f0: Integer) > println(s"t1 stream schema: ${t1Stream.getType()}") > tableEnv.registerDataStream("t1", t1Stream) > /* > new t1 table schema: root > |-- a: INT > |-- f0: INT > */ > println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18782) How to retain the column'name when convert a Table to DataStream
Ying Z created FLINK-18782: -- Summary: How to retain the column'name when convert a Table to DataStream Key: FLINK-18782 URL: https://issues.apache.org/jira/browse/FLINK-18782 Project: Flink Issue Type: Bug Affects Versions: 1.9.1 Reporter: Ying Z mail: [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-retain-the-column-name-when-convert-a-Table-to-DataStream-td37002.html] I met some field name errors when try to convert in Table and DataStream. First, init a datastream and convert to table 'source', register a tablefunction named 'foo' {code:java} val sourceStream = env.socketTextStream("127.0.0.1", 8010) .map(line => line.toInt) tableEnv.registerDataStream("source_table", sourceStream, 'a) class Foo() extends TableFunction[(Int)] { def eval(col: Int): Unit = collect((col * 10)) } tableEnv.registerFunction("foo", new Foo) {code} Then, use sqlQuery to generate a new table t1 with columns 'a' 'b' {code:java} val t1 = tableEnv.sqlQuery( """ |SELECT source_table.a, b FROM source_table |, LATERAL TABLE(foo(a)) as T(b) |""".stripMargin ) /* t1 table schema: root |-- a: INT |-- b: INT */ println(s"t1 table schema: ${t1.getSchema}") {code} When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b' {code:java} val t1Stream = t1.toAppendStream[Row] // t1 stream schema: Row(a: Integer, f0: Integer) println(s"t1 stream schema: ${t1Stream.getType()}") tableEnv.registerDataStream("t1", t1Stream) /* new t1 table schema: root |-- a: INT |-- f0: INT */ println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}") {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15719) Exceptions when using scala types directly with the State Process API
Ying Z created FLINK-15719: -- Summary: Exceptions when using scala types directly with the State Process API Key: FLINK-15719 URL: https://issues.apache.org/jira/browse/FLINK-15719 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.9.1 Reporter: Ying Z I followed these steps to generate and read states: # implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good. # execute `flink cancel -s ${JobID}` => savepoints was generated as expected. # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible." ReaderFunction code as below: {code:java} // code placeholder class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] { var countState: ValueState[(Long, Long)] = _ override def open(parameters: Configuration): Unit = { val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)]) countState = getRuntimeContext().getState(stateDescriptor) }override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = { out.collect(countState.value()) } } {code} 1: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state] 2: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state] -- This message was sent by Atlassian Jira (v8.3.4#803005)