[jira] [Commented] (FLINK-29084) Program argument containing # (pound sign) mistakenly truncated in Kubernetes mode
[ https://issues.apache.org/jira/browse/FLINK-29084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17584207#comment-17584207 ] Weike Dong commented on FLINK-29084: Oh yes, but that issue is still left unresolved. So probably we need to address this issue to prevent further potential "bugs" like this :D > Program argument containing # (pound sign) mistakenly truncated in Kubernetes > mode > -- > > Key: FLINK-29084 > URL: https://issues.apache.org/jira/browse/FLINK-29084 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 > Native Kubernetes (Application Mode) >Reporter: Weike Dong >Priority: Minor > > We have found that when submitting jobs in native-Kubernetes mode, the main > arguments of the Flink program would be truncated if it contains a # > character. > For example, if we pass 'ab#cd' as the argument for Flink programs, Flink > actually gets only 'ab' from the variable > `$internal.application.program-args` at runtime. > After searching into the code, we found the reason might be that when > `org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator#buildAccompanyingKubernetesResources` > transform Flink config data `Map` into `ConfigMap`, fabric8 Kubernetes > client converts it to YAML internally, without any escaping procedures. > Afterwards, when there is a # character in the YAML line, the decoder treats > it as the start of a comment, thus the substring after the # character is > ignored erroneously. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29084) Program argument containing # (pound sign) mistakenly truncated in Kubernetes mode
[ https://issues.apache.org/jira/browse/FLINK-29084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17583530#comment-17583530 ] Weike Dong commented on FLINK-29084: One possible solution is to base64-encode the program arguments and decode them later, but we may need to change _flink-clients_ module as well, like _org.apache.flink.client.deployment.application.ApplicationConfiguration#applyToConfiguration_ and {_}org.apache.flink.client.deployment.application.ApplicationConfiguration#fromConfiguration{_}. Hi [~wangyang0918] do you have time to have a look at this issue? Thanks : ) > Program argument containing # (pound sign) mistakenly truncated in Kubernetes > mode > -- > > Key: FLINK-29084 > URL: https://issues.apache.org/jira/browse/FLINK-29084 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 > Native Kubernetes (Application Mode) >Reporter: Weike Dong >Priority: Minor > > We have found that when submitting jobs in native-Kubernetes mode, the main > arguments of the Flink program would be truncated if it contains a # > character. > For example, if we pass 'ab#cd' as the argument for Flink programs, Flink > actually gets only 'ab' from the variable > `$internal.application.program-args` at runtime. > After searching into the code, we found the reason might be that when > `org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator#buildAccompanyingKubernetesResources` > transform Flink config data `Map` into `ConfigMap`, fabric8 Kubernetes > client converts it to YAML internally, without any escaping procedures. > Afterwards, when there is a # character in the YAML line, the decoder treats > it as the start of a comment, thus the substring after the # character is > ignored erroneously. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29084) Program argument containing # (pound sign) mistakenly truncated in Kubernetes mode
Weike Dong created FLINK-29084: -- Summary: Program argument containing # (pound sign) mistakenly truncated in Kubernetes mode Key: FLINK-29084 URL: https://issues.apache.org/jira/browse/FLINK-29084 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.15.1, 1.14.5, 1.13.6 Environment: Flink 1.13.6 Native Kubernetes (Application Mode) Reporter: Weike Dong We have found that when submitting jobs in native-Kubernetes mode, the main arguments of the Flink program would be truncated if it contains a # character. For example, if we pass 'ab#cd' as the argument for Flink programs, Flink actually gets only 'ab' from the variable `$internal.application.program-args` at runtime. After searching into the code, we found the reason might be that when `org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator#buildAccompanyingKubernetesResources` transform Flink config data `Map` into `ConfigMap`, fabric8 Kubernetes client converts it to YAML internally, without any escaping procedures. Afterwards, when there is a # character in the YAML line, the decoder treats it as the start of a comment, thus the substring after the # character is ignored erroneously. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28757) Increase precision of TIME fields in JSON Format
[ https://issues.apache.org/jira/browse/FLINK-28757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17573526#comment-17573526 ] Weike Dong commented on FLINK-28757: Related to FLINK-17525 and FLINK-19872, but still not fully resolved. > Increase precision of TIME fields in JSON Format > > > Key: FLINK-28757 > URL: https://issues.apache.org/jira/browse/FLINK-28757 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.13.6, 1.14.5, 1.15.1 >Reporter: Weike Dong >Priority: Minor > > Currently, TIME fields in JSON Format could only support a precision with > ZERO decimal places, like '12:33:16', regardless of the precision of the > original data ingested from the sources. > However, since Flink internally uses Integer to store the values of TIME > fields, the precision could be easily extended to 3 decimal places, like > '12:33:16.235', which has been confirmed by switching to other formats or > modifying > _org.apache.flink.formats.json.JsonToRowDataConverters#convertToTime_ > Hence I propose the extend the precision of TIME fields in JSON Format from 0 > to 3. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28757) Increase precision of TIME fields in JSON Format
Weike Dong created FLINK-28757: -- Summary: Increase precision of TIME fields in JSON Format Key: FLINK-28757 URL: https://issues.apache.org/jira/browse/FLINK-28757 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.15.1, 1.14.5, 1.13.6 Reporter: Weike Dong Currently, TIME fields in JSON Format could only support a precision with ZERO decimal places, like '12:33:16', regardless of the precision of the original data ingested from the sources. However, since Flink internally uses Integer to store the values of TIME fields, the precision could be easily extended to 3 decimal places, like '12:33:16.235', which has been confirmed by switching to other formats or modifying _org.apache.flink.formats.json.JsonToRowDataConverters#convertToTime_ Hence I propose the extend the precision of TIME fields in JSON Format from 0 to 3. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Attachment: (was: image-2022-07-25-20-56-14-111.png) > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > > UPDATE: Not a problem -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Description: UPDATE: Not a problem (was: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, the _BinaryRowData#equals_ method is directly called to give the comparison result. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing _SinkUpsertMaterializer_ to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying MemorySegments, !image-2022-07-25-21-17-33-608.png!) > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > > UPDATE: Not a problem -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Attachment: (was: image-2022-07-25-20-59-31-933.png) > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > > UPDATE: Not a problem -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Attachment: (was: image-2022-07-25-21-17-33-608.png) > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > > UPDATE: Not a problem -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674 ] Weike Dong deleted comment on FLINK-28674: was (Author: kyledong): Hi [~lzljs3620320] [~airblader] , could you please have a look at this issue? Thank you : ) > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, the > _BinaryRowData#equals_ method is directly called to give the comparison > result. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > _SinkUpsertMaterializer_ to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. > > P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying > MemorySegments, > !image-2022-07-25-21-17-33-608.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570903#comment-17570903 ] Weike Dong edited comment on FLINK-28674 at 7/25/22 1:25 PM: - Actually caused by missing timestamp settings in CDC connector options, not a problem in Equaliser. was (Author: kyledong): Actually caused by missing timestamp settings in CDC connector options. > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, the > _BinaryRowData#equals_ method is directly called to give the comparison > result. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > _SinkUpsertMaterializer_ to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. > > P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying > MemorySegments, > !image-2022-07-25-21-17-33-608.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong closed FLINK-28674. -- Resolution: Not A Problem Actually caused by missing timestamp settings in CDC connector options. > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, the > _BinaryRowData#equals_ method is directly called to give the comparison > result. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > _SinkUpsertMaterializer_ to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. > > P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying > MemorySegments, > !image-2022-07-25-21-17-33-608.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Attachment: image-2022-07-25-21-17-33-608.png > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, the > _BinaryRowData#equals_ method is directly called to give the comparison > result. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > SinkUpsertMaterializer to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. > > P.S. the equals method of BinaryRowData actually compares the underlying > MemorySegments, which is not suitable for types like Timestamp -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Description: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, the _BinaryRowData#equals_ method is directly called to give the comparison result. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing _SinkUpsertMaterializer_ to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying MemorySegments, !image-2022-07-25-21-17-33-608.png! was: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, the _BinaryRowData#equals_ method is directly called to give the comparison result. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing SinkUpsertMaterializer to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. P.S. the equals method of BinaryRowData actually compares the underlying MemorySegments, which is not suitable for types like Timestamp > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, the > _BinaryRowData#equals_ method is directly called to give the comparison > result. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > _SinkUpsertMaterializer_ to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. > > P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying > MemorySegments, > !image-2022-07-25-21-17-33-608.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Description: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, the _BinaryRowData#equals_ method is directly called to give the comparison result. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing SinkUpsertMaterializer to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. P.S. the equals method of BinaryRowData actually compares the underlying MemorySegments, which is not suitable for types like Timestamp was: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, the _BinaryRowData#equals_ method is directly called to give the comparison result. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing SinkUpsertMaterializer to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, the > _BinaryRowData#equals_ method is directly called to give the comparison > result. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > SinkUpsertMaterializer to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. > > P.S. the equals method of BinaryRowData actually compares the underlying > MemorySegments, which is not suitable for types like Timestamp -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Description: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, the _BinaryRowData#equals_ method is directly called to give the comparison result. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing SinkUpsertMaterializer to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. was: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, the _BinaryRowData#equals_ method is directly called to give the comparison result. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing SinkUpsertMaterializer to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, the > _BinaryRowData#equals_ method is directly called to give the comparison > result. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > SinkUpsertMaterializer to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Description: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, the _BinaryRowData#equals_ method is directly called to give the comparison result. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing SinkUpsertMaterializer to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. was: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, thus _BinaryRowData#equals_ method is directly called to compare the two rows. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing SinkUpsertMaterializer to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, the > _BinaryRowData#equals_ method is directly called to give the comparison > result. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > SinkUpsertMaterializer to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570890#comment-17570890 ] Weike Dong commented on FLINK-28674: Hi [~lzljs3620320] [~airblader] , could you please have a look at this issue? Thank you : ) > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, > thus _BinaryRowData#equals_ method is directly called to compare the two rows. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > SinkUpsertMaterializer to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Description: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, thus _BinaryRowData#equals_ method is directly called to compare the two rows. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same, causing SinkUpsertMaterializer to falsely think that there are no matches in the states, hence printing errors like "The state is cleared because of state ttl", which eventually leads to the loss of -U data in the final results. was: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, thus _BinaryRowData#equals_ method is directly called to compare the two rows. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same. > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, > thus _BinaryRowData#equals_ method is directly called to compare the two rows. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same, causing > SinkUpsertMaterializer to falsely think that there are no matches in the > states, hence printing errors like "The state is cleared because of state > ttl", which eventually leads to the loss of -U data in the final results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
[ https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-28674: --- Description: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, thus _BinaryRowData#equals_ method is directly called to compare the two rows. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same. was: Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, then _BinaryRowData#equals_ method is called to compare the two rows. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same. > EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in > BinaryRowData > -- > > Key: FLINK-28674 > URL: https://issues.apache.org/jira/browse/FLINK-28674 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Environment: Flink 1.13.6 >Reporter: Weike Dong >Priority: Major > Attachments: image-2022-07-25-20-56-14-111.png, > image-2022-07-25-20-59-31-933.png > > > Hi Devs, > Recently I have discovered that the _equaliser.equals_ call in > _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ > generates wrong comparison results when two binary rows are the same, like > !image-2022-07-25-20-56-14-111.png! > After digging through the generated code for this equaliser, I have found > that when the two input RowData are all instances of {_}BinaryRowData{_}, > thus _BinaryRowData#equals_ method is directly called to compare the two rows. > !image-2022-07-25-20-59-31-933.png! > However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot > properly handle complex data types like {_}Timestamp{_}, so it returns > _false_ even when the actual timestamp values are the same. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData
Weike Dong created FLINK-28674: -- Summary: EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData Key: FLINK-28674 URL: https://issues.apache.org/jira/browse/FLINK-28674 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.15.1, 1.14.5, 1.13.6 Environment: Flink 1.13.6 Reporter: Weike Dong Attachments: image-2022-07-25-20-56-14-111.png, image-2022-07-25-20-59-31-933.png Hi Devs, Recently I have discovered that the _equaliser.equals_ call in _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_ generates wrong comparison results when two binary rows are the same, like !image-2022-07-25-20-56-14-111.png! After digging through the generated code for this equaliser, I have found that when the two input RowData are all instances of {_}BinaryRowData{_}, then _BinaryRowData#equals_ method is called to compare the two rows. !image-2022-07-25-20-59-31-933.png! However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot properly handle complex data types like {_}Timestamp{_}, so it returns _false_ even when the actual timestamp values are the same. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-19677) TaskManager takes abnormally long time to register with JobManager on Kubernetes
Weike Dong created FLINK-19677: -- Summary: TaskManager takes abnormally long time to register with JobManager on Kubernetes Key: FLINK-19677 URL: https://issues.apache.org/jira/browse/FLINK-19677 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.11.2, 1.11.1, 1.11.0 Reporter: Weike Dong During the registration process of TaskManager, JobManager would create a _TaskManagerLocation_ instance, which tries to get hostname of the TaskManager via reverse DNS lookup. However, this always fails in Kubernetes environment, because for pods that are not exposed by Services, their IPs cannot be resolved to domains by coredns, and _InetAddress#getCanonicalHostName()_ would take ~5 seconds to return, blocking the whole registration process. Therefore Flink should provide a configuration parameter to turn off reverse DNS lookup. Also, even when hostname is actually needed, this could be done lazily to avoid blocking registration of other TaskManagers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156587#comment-17156587 ] Weike Dong commented on FLINK-18452: Yes, I agree that we can fix this bug in next major release, i.e. 1.12.0, as currently there are no other user reports on this problem, it seems not to be that urgent to change existing class structure. > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Assignee: Weike Dong >Priority: Major > Fix For: 1.12.0 > > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type serializers. > > In short, during checkpointing, Flink serializes
[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-18452: --- Fix Version/s: 1.12.0 > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Assignee: Weike Dong >Priority: Major > Fix For: 1.12.0 > > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type serializers. > > In short, during checkpointing, Flink serializes _SortedMapSerializer_ by > creating a _SortedMapSerializerSnapshot_ object, and the original comparator > is encapsulated within the object (here we call it > _StreamExecSortComparator$579_). > > At restoration, the
[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148388#comment-17148388 ] Weike Dong commented on FLINK-18452: One important thing to note here is backward compatibility. For example, for _GeneratedRecordComparator_ instances created in current or earlier Flink versions, AFAIK they do not have the proper meta info needed to compare with the new one with the meta info. In order to maintain compatibility, a hacky approach is to extract relevant fields from the generated code text, however, this is error-prone and could possibly only be used as a fallback approach if no other methods are available. Or maybe we could add a new class (like _GeneratedRecordComparatorV2_) with the required meta info, and "migrate" the current implementation to the new one if needed. In this way, we could enumerate all of the comparator code generation implementations in existing Flink versions, and provide a robust migration plan. > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Priority: Major > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at
[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148348#comment-17148348 ] Weike Dong commented on FLINK-18452: Thank you [~jark] for the reply, and I agree that the meta info given to _org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen_ is more suitable for the comparison (maybe there are other classes that have the same problem). I am quite interested in solving this issue, could you please assign this ticket to me? Thank you very much : ) > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Priority: Major > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility
[jira] [Comment Edited] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148289#comment-17148289 ] Weike Dong edited comment on FLINK-18452 at 6/30/20, 3:46 AM: -- Hi [~jinyu.zj], would you please be so kind to look at this issue, as you are the original author for the code. Thank you very much. was (Author: kyledong): Hi [~jinyu.zj], would you please be so kind to look at this issus, as you are the original author for the code. Thank you very much. > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Priority: Major > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type
[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148289#comment-17148289 ] Weike Dong commented on FLINK-18452: Hi [~jinyu.zj], would you please be so kind to look at this issus, as you are the original author for the code. Thank you very much. > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Priority: Major > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type serializers. > > In short, during checkpointing, Flink serializes _SortedMapSerializer_ by > creating a _SortedMapSerializerSnapshot_ object, and the original comparator > is encapsulated within the object
[jira] [Created] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
Weike Dong created FLINK-18452: -- Summary: Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration Key: FLINK-18452 URL: https://issues.apache.org/jira/browse/FLINK-18452 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.10.1, 1.10.0, 1.11.0 Reporter: Weike Dong Attachments: c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png We found that in SQL jobs using "Top-N" functionality provided by the blink planner, the job state cannot be retrieved because of "incompatible" state serializers (in fact they are compatible). The error log is displayed like below {panel:title=taskmanager.log} 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 13 more{panel} After careful debugging, it is found to be an issue with the compatibility check of type serializers. In short, during checkpointing, Flink serializes _SortedMapSerializer_ by creating a _SortedMapSerializerSnapshot_ object, and the original comparator is encapsulated within the object (here we call it _StreamExecSortComparator$579_). At restoration, the object is read and restored as normal. However, during the construction of RetractableTopNFunction instance, another Comparator is provided by Flink as an argument (we call it _StreamExecSortComparator$626_), and it is later used in the _ValueStateDescriptor_ which acts like a key to the state store. Here comes the problem: when the newly-restored Flink program tries to access state (_getState_) through the previously mentioned _ValueStateDescriptor_, the State Backend firstly
[jira] [Commented] (FLINK-16788) ElasticSearch Connector SQL DDL add optional config (eg: enable-auth/username/password)
[ https://issues.apache.org/jira/browse/FLINK-16788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17081809#comment-17081809 ] Weike Dong commented on FLINK-16788: Hi [~zhisheng], this is a good feature proposal. Personally I think 'connector.enable-auth' option could become optional or removed, e.g. if users specify username and password, then definitely they want to enable authentication. > ElasticSearch Connector SQL DDL add optional config (eg: > enable-auth/username/password) > --- > > Key: FLINK-16788 > URL: https://issues.apache.org/jira/browse/FLINK-16788 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch, Table SQL / API >Affects Versions: 1.10.0 >Reporter: zhisheng >Priority: Major > Fix For: 1.11.0 > > > In a production environment, accessing elasticsearch usually requires > authentication, and requires a username and password to access it, but the > current version of SQL DDL does not support users to configure these > parameters. > > I have improve it in our company, and we use it as follows: > > CREATE TABLE user_behavior_es ( > user_idBIGINT, > item_id BIGINT > ) WITH ( > 'connector.type'='elasticsearch', > 'connector.version'='7', > 'connector.hosts'='http://localhost:9200', > 'connector.index'='user_behavior_es', > 'connector.document-type'='user_behavior_es', > 'connector.enable-auth'='true', > 'connector.username'='zhisheng', > 'connector.password'='123456', > 'format.type'='json', > 'update-mode'='append', > 'connector.bulk-flush.max-actions'='10' > ) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16626) Prevent REST handler from being closed more than once
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-16626: --- Description: In Flink 1.10.0 release, job cancellation can be problematic, as users frequently experience java.util.concurrent.TimeoutException at the client side, because the REST endpoint closes pre-maturely before sending out the response, this is because the jobCancellationHandler is incorrectly reused and closed twice. When executing the following command to stop a flink job with yarn per-job mode, the client keeps retrying untill timeout (1 minutes)and exit with failure. But the job stops successfully. Command : {noformat} flink cancel $jobId yid appId {noformat} The exception on the client side is : {quote}{quote} {quote} 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel ... 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel 2020-03-17 12:33:14,077 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command. org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a. at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) ... 9 more The program finished with the following exception: org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a. at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) ... 9 more {quote} Actually, the job was cancelled. But the server also prints some exception: {quote}2020-03-17 12:25:13,754 ERROR [flink-akka.actor.default-dispatcher-17] org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:766) - Failed to submit a listener notification task. Event loop shut down? java.util.concurrent.RejectedExecutionException: event executor terminated at
[jira] [Updated] (FLINK-16626) Prevent REST handler from being closed more than once
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-16626: --- Summary: Prevent REST handler from being closed more than once (was: Exception encountered when cancelling a job in yarn per-job mode) > Prevent REST handler from being closed more than once > - > > Key: FLINK-16626 > URL: https://issues.apache.org/jira/browse/FLINK-16626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Assignee: Weike Dong >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > Attachments: jobmanager.log, patch.diff > > > When executing the following command to stop a flink job with yarn per-job > mode, the client keeps retrying untill timeout (1 minutes)and exit with > failure. But the job stops successfully. > Command : > {noformat} > flink cancel $jobId yid appId > {noformat} > The exception on the client side is : > {quote}bq. > 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > ... > 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Shutting down rest endpoint. > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,077 DEBUG > org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 > thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 > 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest > endpoint shutdown complete. > 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error > while running the command. > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at >
[jira] [Commented] (FLINK-16626) Exception encountered when cancelling a job in yarn per-job mode
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072429#comment-17072429 ] Weike Dong commented on FLINK-16626: Thanks [~liyu] [~tison] for watching this issue, and I will try my best to fix and test it thoroughly : ) > Exception encountered when cancelling a job in yarn per-job mode > > > Key: FLINK-16626 > URL: https://issues.apache.org/jira/browse/FLINK-16626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Assignee: Weike Dong >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > Attachments: jobmanager.log, patch.diff > > > When executing the following command to stop a flink job with yarn per-job > mode, the client keeps retrying untill timeout (1 minutes)and exit with > failure. But the job stops successfully. > Command : > {noformat} > flink cancel $jobId yid appId > {noformat} > The exception on the client side is : > {quote}bq. > 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > ... > 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Shutting down rest endpoint. > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,077 DEBUG > org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 > thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 > 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest > endpoint shutdown complete. > 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error > while running the command. > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at
[jira] [Commented] (FLINK-16626) Exception encountered when cancelling a job in yarn per-job mode
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072311#comment-17072311 ] Weike Dong commented on FLINK-16626: Hi [~tison], I am quite interested in solving this issue and have been following this for a while, so I guess I can help fix this bug : ) > Exception encountered when cancelling a job in yarn per-job mode > > > Key: FLINK-16626 > URL: https://issues.apache.org/jira/browse/FLINK-16626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Priority: Major > Fix For: 1.10.1, 1.11.0 > > Attachments: jobmanager.log, patch.diff > > > When executing the following command to stop a flink job with yarn per-job > mode, the client keeps retrying untill timeout (1 minutes)and exit with > failure. But the job stops successfully. > Command : > {noformat} > flink cancel $jobId yid appId > {noformat} > The exception on the client side is : > {quote}bq. > 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > ... > 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Shutting down rest endpoint. > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,077 DEBUG > org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 > thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 > 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest > endpoint shutdown complete. > 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error > while running the command. > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at >
[jira] [Commented] (FLINK-16708) When a JDBC connection has been closed, the retry policy of the JDBCUpsertOutputFormat cannot take effect and may result in data loss
[ https://issues.apache.org/jira/browse/FLINK-16708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17064580#comment-17064580 ] Weike Dong commented on FLINK-16708: Hi [~lzljs3620320] , do you have time to look into this issue? Thanks > When a JDBC connection has been closed, the retry policy of the > JDBCUpsertOutputFormat cannot take effect and may result in data loss > - > > Key: FLINK-16708 > URL: https://issues.apache.org/jira/browse/FLINK-16708 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: tangshangwen >Assignee: tangshangwen >Priority: Major > > In our test environment, I used the tcpkill command to simulate a scenario > where the postgresql connection was closed. I found that the retry strategy > of the flush method did not take effect, and when it retried the second time, > it could not recognize that the connection had been closed because Before the > first check whether the connection is closed, the batchStatements of > PgStatement have been cleared, which causes the second execution to check > that the batchStatements are empty and return normally. > {code:java} > 2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch > error, retry times = 1 > org.postgresql.util.PSQLException: This connection has been closed. > at > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) > at > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at > org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch > error, retry times = 1 > org.postgresql.util.PSQLException: This connection has been closed. > at > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) > at > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at > org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7129) Support dynamically changing CEP patterns
[ https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647591#comment-16647591 ] Weike Dong commented on FLINK-7129: --- I am also curious about the current progress on this issue : ) > Support dynamically changing CEP patterns > - > > Key: FLINK-7129 > URL: https://issues.apache.org/jira/browse/FLINK-7129 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > > An umbrella task for introducing mechanism for injecting patterns through > coStream -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9915) Add TO_BASE64 function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552446#comment-16552446 ] Weike Dong commented on FLINK-9915: --- Hi vino, I strongly support this feature, as the built-in functions in Flink Table / SQL are relatively small in number, lacking in many useful transformation functions for strings. However, many users with experiences of Hive SQL would prefer the Hive SQL style function name like "BASE64" or "UNBASE64", therefore the naming of the function could be discussed. Also for the type of the parameter for TO_BASE64, whether a binary array could be accepted or only a string is acceptable is another point to be considered. But anyway, +1 for the feature : ) > Add TO_BASE64 function for table/sql API > > > Key: FLINK-9915 > URL: https://issues.apache.org/jira/browse/FLINK-9915 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > refer to mysql TO_BASE64 function : > https://dev.mysql.com/doc/refman/5.6/en/string-functions.html#function_to-base64 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8353) Add support for timezones
[ https://issues.apache.org/jira/browse/FLINK-8353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493639#comment-16493639 ] Weike Dong commented on FLINK-8353: --- I strongly support these features, preferably there could be a way to set a specific timezone for a particular job, so that all the subsequent temporal processing could be based on that. As user's input data are often collected from other systems that do not follow the rules set by Flink (UTC+0), currently some temporal UDFs are needed to perform such transformations, which adds the complexity for the whole system, especially in case of watermark generation or output of processing time into external database, etc. > Add support for timezones > - > > Key: FLINK-8353 > URL: https://issues.apache.org/jira/browse/FLINK-8353 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Priority: Major > > This is an umbrella issue for adding support for timezones in the Table & SQL > API. > Usually companies work with different timezones simultaneously. We could add > support for the new time classes introduced with Java 8 and enable our scalar > functions to also work with those (or some custom time class implementations > like those from Calcite). We need a good design for this to address most of > the problems users face related to timestamp and timezones. > It is up for discussion how to ship date, time, timestamp instances through > the cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)