[jira] [Commented] (FLINK-29084) Program argument containing # (pound sign) mistakenly truncated in Kubernetes mode

2022-08-24 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17584207#comment-17584207
 ] 

Weike Dong commented on FLINK-29084:


Oh yes, but that issue is still left unresolved. So probably we need to address 
this issue to prevent further potential "bugs" like this :D

> Program argument containing # (pound sign) mistakenly truncated in Kubernetes 
> mode
> --
>
> Key: FLINK-29084
> URL: https://issues.apache.org/jira/browse/FLINK-29084
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
> Native Kubernetes (Application Mode)
>Reporter: Weike Dong
>Priority: Minor
>
> We have found that when submitting jobs in native-Kubernetes mode, the main 
> arguments of the Flink program would be truncated if it contains a # 
> character.
> For example, if we pass 'ab#cd' as the argument for Flink programs, Flink 
> actually gets only 'ab' from the variable 
> `$internal.application.program-args` at runtime.
> After searching into the code, we found the reason might be that when 
> `org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator#buildAccompanyingKubernetesResources`
>  transform Flink config data `Map` into `ConfigMap`, fabric8 Kubernetes 
> client converts it to YAML internally, without any escaping procedures. 
> Afterwards, when there is a # character in the YAML line, the decoder treats 
> it as the start of a comment, thus the substring after the # character is 
> ignored erroneously.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29084) Program argument containing # (pound sign) mistakenly truncated in Kubernetes mode

2022-08-23 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17583530#comment-17583530
 ] 

Weike Dong commented on FLINK-29084:


One possible solution is to base64-encode the program arguments and decode them 
later, but we may need to change _flink-clients_ module as well, like 
_org.apache.flink.client.deployment.application.ApplicationConfiguration#applyToConfiguration_
 and 
{_}org.apache.flink.client.deployment.application.ApplicationConfiguration#fromConfiguration{_}.

Hi [~wangyang0918] do you have time to have a look at this issue? Thanks : )

> Program argument containing # (pound sign) mistakenly truncated in Kubernetes 
> mode
> --
>
> Key: FLINK-29084
> URL: https://issues.apache.org/jira/browse/FLINK-29084
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
> Native Kubernetes (Application Mode)
>Reporter: Weike Dong
>Priority: Minor
>
> We have found that when submitting jobs in native-Kubernetes mode, the main 
> arguments of the Flink program would be truncated if it contains a # 
> character.
> For example, if we pass 'ab#cd' as the argument for Flink programs, Flink 
> actually gets only 'ab' from the variable 
> `$internal.application.program-args` at runtime.
> After searching into the code, we found the reason might be that when 
> `org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator#buildAccompanyingKubernetesResources`
>  transform Flink config data `Map` into `ConfigMap`, fabric8 Kubernetes 
> client converts it to YAML internally, without any escaping procedures. 
> Afterwards, when there is a # character in the YAML line, the decoder treats 
> it as the start of a comment, thus the substring after the # character is 
> ignored erroneously.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29084) Program argument containing # (pound sign) mistakenly truncated in Kubernetes mode

2022-08-23 Thread Weike Dong (Jira)
Weike Dong created FLINK-29084:
--

 Summary: Program argument containing # (pound sign) mistakenly 
truncated in Kubernetes mode
 Key: FLINK-29084
 URL: https://issues.apache.org/jira/browse/FLINK-29084
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.15.1, 1.14.5, 1.13.6
 Environment: Flink 1.13.6

Native Kubernetes (Application Mode)
Reporter: Weike Dong


We have found that when submitting jobs in native-Kubernetes mode, the main 
arguments of the Flink program would be truncated if it contains a # character.

For example, if we pass 'ab#cd' as the argument for Flink programs, Flink 
actually gets only 'ab' from the variable `$internal.application.program-args` 
at runtime.

After searching into the code, we found the reason might be that when 
`org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator#buildAccompanyingKubernetesResources`
 transform Flink config data `Map` into `ConfigMap`, fabric8 Kubernetes client 
converts it to YAML internally, without any escaping procedures. Afterwards, 
when there is a # character in the YAML line, the decoder treats it as the 
start of a comment, thus the substring after the # character is ignored 
erroneously.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28757) Increase precision of TIME fields in JSON Format

2022-07-31 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17573526#comment-17573526
 ] 

Weike Dong commented on FLINK-28757:


Related to FLINK-17525 and FLINK-19872, but still not fully resolved.

> Increase precision of TIME fields in JSON Format
> 
>
> Key: FLINK-28757
> URL: https://issues.apache.org/jira/browse/FLINK-28757
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
>Reporter: Weike Dong
>Priority: Minor
>
> Currently, TIME fields in JSON Format could only support a precision with 
> ZERO decimal places, like '12:33:16', regardless of the precision of the 
> original data ingested from the sources.
> However, since Flink internally uses Integer to store the values of TIME 
> fields, the precision could be easily extended to 3 decimal places, like 
> '12:33:16.235', which has been confirmed by switching to other formats or 
> modifying 
> _org.apache.flink.formats.json.JsonToRowDataConverters#convertToTime_
> Hence I propose the extend the precision of TIME fields in JSON Format from 0 
> to 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28757) Increase precision of TIME fields in JSON Format

2022-07-31 Thread Weike Dong (Jira)
Weike Dong created FLINK-28757:
--

 Summary: Increase precision of TIME fields in JSON Format
 Key: FLINK-28757
 URL: https://issues.apache.org/jira/browse/FLINK-28757
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.15.1, 1.14.5, 1.13.6
Reporter: Weike Dong


Currently, TIME fields in JSON Format could only support a precision with ZERO 
decimal places, like '12:33:16', regardless of the precision of the original 
data ingested from the sources.

However, since Flink internally uses Integer to store the values of TIME 
fields, the precision could be easily extended to 3 decimal places, like 
'12:33:16.235', which has been confirmed by switching to other formats or 
modifying _org.apache.flink.formats.json.JsonToRowDataConverters#convertToTime_

Hence I propose the extend the precision of TIME fields in JSON Format from 0 
to 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Attachment: (was: image-2022-07-25-20-56-14-111.png)

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
>
> UPDATE: Not a problem



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Description: UPDATE: Not a problem  (was: Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, the 
_BinaryRowData#equals_ method is directly called to give the comparison result. 

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
_SinkUpsertMaterializer_ to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.

 

P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying 
MemorySegments,

!image-2022-07-25-21-17-33-608.png!)

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
>
> UPDATE: Not a problem



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Attachment: (was: image-2022-07-25-20-59-31-933.png)

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
>
> UPDATE: Not a problem



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Attachment: (was: image-2022-07-25-21-17-33-608.png)

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
>
> UPDATE: Not a problem



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


[ https://issues.apache.org/jira/browse/FLINK-28674 ]


Weike Dong deleted comment on FLINK-28674:


was (Author: kyledong):
Hi [~lzljs3620320] [~airblader] , could you please have a look at this issue? 
Thank you : )

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, the 
> _BinaryRowData#equals_ method is directly called to give the comparison 
> result. 
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> _SinkUpsertMaterializer_ to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.
>  
> P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying 
> MemorySegments,
> !image-2022-07-25-21-17-33-608.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570903#comment-17570903
 ] 

Weike Dong edited comment on FLINK-28674 at 7/25/22 1:25 PM:
-

Actually caused by missing timestamp settings in CDC connector options, not a 
problem in Equaliser.


was (Author: kyledong):
Actually caused by missing timestamp settings in CDC connector options.

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, the 
> _BinaryRowData#equals_ method is directly called to give the comparison 
> result. 
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> _SinkUpsertMaterializer_ to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.
>  
> P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying 
> MemorySegments,
> !image-2022-07-25-21-17-33-608.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong closed FLINK-28674.
--
Resolution: Not A Problem

Actually caused by missing timestamp settings in CDC connector options.

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, the 
> _BinaryRowData#equals_ method is directly called to give the comparison 
> result. 
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> _SinkUpsertMaterializer_ to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.
>  
> P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying 
> MemorySegments,
> !image-2022-07-25-21-17-33-608.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Attachment: image-2022-07-25-21-17-33-608.png

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, the 
> _BinaryRowData#equals_ method is directly called to give the comparison 
> result. 
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> SinkUpsertMaterializer to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.
>  
> P.S. the equals method of BinaryRowData actually compares the underlying 
> MemorySegments, which is not suitable for types like Timestamp



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Description: 
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, the 
_BinaryRowData#equals_ method is directly called to give the comparison result. 

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
_SinkUpsertMaterializer_ to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.

 

P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying 
MemorySegments,

!image-2022-07-25-21-17-33-608.png!

  was:
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, the 
_BinaryRowData#equals_ method is directly called to give the comparison result. 

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
SinkUpsertMaterializer to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.

 

P.S. the equals method of BinaryRowData actually compares the underlying 
MemorySegments, which is not suitable for types like Timestamp


> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, the 
> _BinaryRowData#equals_ method is directly called to give the comparison 
> result. 
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> _SinkUpsertMaterializer_ to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.
>  
> P.S. the _equals_ method of _BinaryRowData_ actually compares the underlying 
> MemorySegments,
> !image-2022-07-25-21-17-33-608.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Description: 
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, the 
_BinaryRowData#equals_ method is directly called to give the comparison result. 

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
SinkUpsertMaterializer to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.

 

P.S. the equals method of BinaryRowData actually compares the underlying 
MemorySegments, which is not suitable for types like Timestamp

  was:
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, the 
_BinaryRowData#equals_ method is directly called to give the comparison result. 

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
SinkUpsertMaterializer to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.


> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png, image-2022-07-25-21-17-33-608.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, the 
> _BinaryRowData#equals_ method is directly called to give the comparison 
> result. 
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> SinkUpsertMaterializer to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.
>  
> P.S. the equals method of BinaryRowData actually compares the underlying 
> MemorySegments, which is not suitable for types like Timestamp



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Description: 
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, the 
_BinaryRowData#equals_ method is directly called to give the comparison result. 

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
SinkUpsertMaterializer to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.

  was:
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, the 
_BinaryRowData#equals_ method is directly called to give the comparison result.

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
SinkUpsertMaterializer to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.


> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, the 
> _BinaryRowData#equals_ method is directly called to give the comparison 
> result. 
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> SinkUpsertMaterializer to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Description: 
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, the 
_BinaryRowData#equals_ method is directly called to give the comparison result.

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
SinkUpsertMaterializer to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.

  was:
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, thus 
_BinaryRowData#equals_ method is directly called to compare the two rows.

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
SinkUpsertMaterializer to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.


> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, the 
> _BinaryRowData#equals_ method is directly called to give the comparison 
> result.
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> SinkUpsertMaterializer to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570890#comment-17570890
 ] 

Weike Dong commented on FLINK-28674:


Hi [~lzljs3620320] [~airblader] , could you please have a look at this issue? 
Thank you : )

> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, 
> thus _BinaryRowData#equals_ method is directly called to compare the two rows.
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> SinkUpsertMaterializer to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Description: 
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, thus 
_BinaryRowData#equals_ method is directly called to compare the two rows.

!image-2022-07-25-20-59-31-933.png!

However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same, causing 
SinkUpsertMaterializer to falsely think that there are no matches in the 
states, hence printing errors like "The state is cleared because of state ttl", 
which eventually leads to the loss of -U data in the final results.

  was:
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, thus 
_BinaryRowData#equals_ method is directly called to compare the two rows.

!image-2022-07-25-20-59-31-933.png!

However, as you can see  in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same.

 


> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, 
> thus _BinaryRowData#equals_ method is directly called to compare the two rows.
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same, causing 
> SinkUpsertMaterializer to falsely think that there are no matches in the 
> states, hence printing errors like "The state is cleared because of state 
> ttl", which eventually leads to the loss of -U data in the final results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-28674:
---
Description: 
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, thus 
_BinaryRowData#equals_ method is directly called to compare the two rows.

!image-2022-07-25-20-59-31-933.png!

However, as you can see  in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same.

 

  was:
Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, then 
_BinaryRowData#equals_ method is called to compare the two rows.

!image-2022-07-25-20-59-31-933.png!

However, as you can see  in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same.

 


> EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in 
> BinaryRowData
> --
>
> Key: FLINK-28674
> URL: https://issues.apache.org/jira/browse/FLINK-28674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Environment: Flink 1.13.6
>Reporter: Weike Dong
>Priority: Major
> Attachments: image-2022-07-25-20-56-14-111.png, 
> image-2022-07-25-20-59-31-933.png
>
>
> Hi Devs,
> Recently I have discovered that the _equaliser.equals_ call in 
> _org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
>  generates wrong comparison results when two binary rows are the same, like
> !image-2022-07-25-20-56-14-111.png!
> After digging through the generated code for this equaliser, I have found 
> that when the two input RowData are all instances of {_}BinaryRowData{_}, 
> thus _BinaryRowData#equals_ method is directly called to compare the two rows.
> !image-2022-07-25-20-59-31-933.png!
> However, as you can see  in the first snapshot, _BinaryRowData#equals_ cannot 
> properly handle complex data types like {_}Timestamp{_}, so it returns 
> _false_ even when the actual timestamp values are the same.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28674) EqualiserCodeGenerator generates wrong equaliser for Timestamp fields in BinaryRowData

2022-07-25 Thread Weike Dong (Jira)
Weike Dong created FLINK-28674:
--

 Summary: EqualiserCodeGenerator generates wrong equaliser for 
Timestamp fields in BinaryRowData
 Key: FLINK-28674
 URL: https://issues.apache.org/jira/browse/FLINK-28674
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.15.1, 1.14.5, 1.13.6
 Environment: Flink 1.13.6
Reporter: Weike Dong
 Attachments: image-2022-07-25-20-56-14-111.png, 
image-2022-07-25-20-59-31-933.png

Hi Devs,

Recently I have discovered that the _equaliser.equals_ call in 
_org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#removeFirst_
 generates wrong comparison results when two binary rows are the same, like

!image-2022-07-25-20-56-14-111.png!

After digging through the generated code for this equaliser, I have found that 
when the two input RowData are all instances of {_}BinaryRowData{_}, then 
_BinaryRowData#equals_ method is called to compare the two rows.

!image-2022-07-25-20-59-31-933.png!

However, as you can see  in the first snapshot, _BinaryRowData#equals_ cannot 
properly handle complex data types like {_}Timestamp{_}, so it returns _false_ 
even when the actual timestamp values are the same.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-19677) TaskManager takes abnormally long time to register with JobManager on Kubernetes

2020-10-16 Thread Weike Dong (Jira)
Weike Dong created FLINK-19677:
--

 Summary: TaskManager takes abnormally long time to register with 
JobManager on Kubernetes
 Key: FLINK-19677
 URL: https://issues.apache.org/jira/browse/FLINK-19677
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.11.2, 1.11.1, 1.11.0
Reporter: Weike Dong


During the registration process of TaskManager, JobManager would create a 

_TaskManagerLocation_ instance, which tries to get hostname of the TaskManager 
via reverse DNS lookup.

However, this always fails in Kubernetes environment, because for pods that are 
not exposed by Services, their IPs cannot be resolved to domains by coredns, 
and _InetAddress#getCanonicalHostName()_ would take ~5 seconds to return, 
blocking the whole registration process.

Therefore Flink should provide a configuration parameter to turn off reverse 
DNS lookup. Also, even when hostname is actually needed, this could be done 
lazily to avoid blocking registration of other TaskManagers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-07-13 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156587#comment-17156587
 ] 

Weike Dong commented on FLINK-18452:


Yes, I agree that we can fix this bug in next major release, i.e. 1.12.0, as 
currently there are no other user reports on this problem, it seems not to be 
that urgent to change existing class structure.

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Weike Dong
>Priority: Major
> Fix For: 1.12.0
>
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes 

[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-07-13 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-18452:
---
Fix Version/s: 1.12.0

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Weike Dong
>Priority: Major
> Fix For: 1.12.0
>
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object (here we call it 
> _StreamExecSortComparator$579_).
>  
> At restoration, the 

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-30 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148388#comment-17148388
 ] 

Weike Dong commented on FLINK-18452:


One important thing to note here is backward compatibility. For example, for 
_GeneratedRecordComparator_ instances created in current or earlier Flink 
versions, AFAIK they do not have the proper meta info needed to compare with 
the new one with the meta info.

In order to maintain compatibility, a hacky approach is to extract relevant 
fields from the generated code text, however, this is error-prone and could 
possibly only be used as a fallback approach if no other methods are available.

Or maybe we could add a new class (like _GeneratedRecordComparatorV2_) with the 
required meta info, and "migrate" the current implementation to the new one if 
needed. In this way, we could enumerate all of the comparator code generation 
implementations in existing Flink versions, and provide a robust migration plan.

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-30 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148348#comment-17148348
 ] 

Weike Dong commented on FLINK-18452:


Thank you [~jark] for the reply, and I agree that the meta info given to 
_org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen_ is 
more suitable for the comparison (maybe there are other classes that have the 
same problem).

I am quite interested in solving this issue, could you please assign this 
ticket to me? Thank you very much : )

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 

[jira] [Comment Edited] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-29 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148289#comment-17148289
 ] 

Weike Dong edited comment on FLINK-18452 at 6/30/20, 3:46 AM:
--

Hi [~jinyu.zj], would you please be so kind to look at this issue, as you are 
the original author for the code. Thank you very much.


was (Author: kyledong):
Hi [~jinyu.zj], would you please be so kind to look at this issus, as you are 
the original author for the code. Thank you very much.

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type 

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-29 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148289#comment-17148289
 ] 

Weike Dong commented on FLINK-18452:


Hi [~jinyu.zj], would you please be so kind to look at this issus, as you are 
the original author for the code. Thank you very much.

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object 

[jira] [Created] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-29 Thread Weike Dong (Jira)
Weike Dong created FLINK-18452:
--

 Summary: Flaws in RetractableTopNFunction.ComparatorWrapper#equals 
method prevent state access after restoration
 Key: FLINK-18452
 URL: https://issues.apache.org/jira/browse/FLINK-18452
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.10.1, 1.10.0, 1.11.0
Reporter: Weike Dong
 Attachments: 
c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png

We found that in SQL jobs using "Top-N" functionality provided by the blink 
planner, the job state cannot be retrieved because of "incompatible" state 
serializers (in fact they are compatible).

The error log is displayed like below
{panel:title=taskmanager.log}
2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - 
Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=100], partitionBy=[appkey, serverid], orderBy=[quantity DESC], 
select=[appkey, serverid, oid, quantity]) (1/1) 
(bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at 
org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer cannot be incompatible.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 13 more{panel}
 
After careful debugging, it is found to be an issue with the compatibility 
check of type serializers.
 
In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
creating a _SortedMapSerializerSnapshot_ object, and the original comparator is 
encapsulated within the object (here we call it _StreamExecSortComparator$579_).
 
At restoration, the object is read and restored as normal. However, during the 
construction of RetractableTopNFunction instance, another Comparator is 
provided by Flink as an argument (we call it _StreamExecSortComparator$626_), 
and it is later used in the _ValueStateDescriptor_ which acts like a key to the 
state store.
 
Here comes the problem: when the newly-restored Flink program tries to access 
state (_getState_) through the previously mentioned _ValueStateDescriptor_, the 
State Backend firstly 

[jira] [Commented] (FLINK-16788) ElasticSearch Connector SQL DDL add optional config (eg: enable-auth/username/password)

2020-04-12 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17081809#comment-17081809
 ] 

Weike Dong commented on FLINK-16788:


Hi [~zhisheng], this is a good feature proposal. Personally I think  
'connector.enable-auth'  option could become optional or removed, e.g. if users 
specify username and password, then definitely they want to enable 
authentication.

> ElasticSearch Connector SQL DDL add optional config (eg: 
> enable-auth/username/password)
> ---
>
> Key: FLINK-16788
> URL: https://issues.apache.org/jira/browse/FLINK-16788
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch, Table SQL / API
>Affects Versions: 1.10.0
>Reporter: zhisheng
>Priority: Major
> Fix For: 1.11.0
>
>
> In a production environment, accessing elasticsearch usually requires 
> authentication, and requires a username and password to access it, but the 
> current version of SQL DDL does not support users to configure these 
> parameters.
>  
> I have improve it in our company, and we use it as follows:
>  
> CREATE TABLE user_behavior_es (
>     user_idBIGINT,
>     item_id BIGINT
> ) WITH (
>     'connector.type'='elasticsearch',
>     'connector.version'='7',
>     'connector.hosts'='http://localhost:9200',
>     'connector.index'='user_behavior_es',
>     'connector.document-type'='user_behavior_es',
>     'connector.enable-auth'='true',
>     'connector.username'='zhisheng',
>     'connector.password'='123456',
>     'format.type'='json',
>     'update-mode'='append',
>     'connector.bulk-flush.max-actions'='10'
> )



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16626) Prevent REST handler from being closed more than once

2020-04-05 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-16626:
---
Description: 
In Flink 1.10.0 release, job cancellation can be problematic, as users 
frequently experience java.util.concurrent.TimeoutException at the client side, 
because the REST endpoint closes pre-maturely before sending out the response, 
this is because the jobCancellationHandler is incorrectly reused and closed 
twice.
 
When executing the following command to stop a flink job with yarn per-job 
mode, the client keeps retrying untill timeout (1 minutes)and exit with 
failure. But the job stops successfully.
 Command :
{noformat}
flink cancel $jobId yid appId
{noformat}
 The exception on the client side is :
{quote}{quote} 
{quote}
2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
Sending request of class class 
org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
 ...
 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
Sending request of class class 
org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
Shutting down rest endpoint.
 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
Sending request of class class 
org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
 2020-03-17 12:33:14,077 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
endpoint shutdown complete.
 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
while running the command.
 org.apache.flink.util.FlinkException: Could not cancel job 
cc61033484d4c0e7a27a8a2a36f4de7a.
 at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
 at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
 at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
 Caused by: java.util.concurrent.TimeoutException
 at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
 ... 9 more


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not cancel job 
cc61033484d4c0e7a27a8a2a36f4de7a.
 at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
 at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
 at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
 Caused by: java.util.concurrent.TimeoutException
 at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
 ... 9 more
{quote}
Actually, the job was cancelled. But the server also prints some exception:

 
{quote}2020-03-17 12:25:13,754 ERROR [flink-akka.actor.default-dispatcher-17] 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:766)
 - Failed to submit a listener notification task. Event loop shut down? 
java.util.concurrent.RejectedExecutionException: event executor terminated at 

[jira] [Updated] (FLINK-16626) Prevent REST handler from being closed more than once

2020-04-05 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-16626:
---
Summary: Prevent REST handler from being closed more than once  (was: 
Exception encountered when cancelling a job in yarn per-job mode)

> Prevent REST handler from being closed more than once
> -
>
> Key: FLINK-16626
> URL: https://issues.apache.org/jira/browse/FLINK-16626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Assignee: Weike Dong
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
> Attachments: jobmanager.log, patch.diff
>
>
> When executing the following command to stop a flink job with yarn per-job 
> mode, the client keeps retrying untill timeout (1 minutes)and exit with 
> failure. But the job stops successfully.
>  Command :
> {noformat}
> flink cancel $jobId yid appId
> {noformat}
>  The exception on the client side is :
> {quote}bq. 
> 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> ...
> 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Shutting down rest endpoint.
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,077 DEBUG 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
> thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
> 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
> endpoint shutdown complete.
> 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> 

[jira] [Commented] (FLINK-16626) Exception encountered when cancelling a job in yarn per-job mode

2020-04-01 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072429#comment-17072429
 ] 

Weike Dong commented on FLINK-16626:


Thanks [~liyu]  [~tison] for watching this issue, and I will try my best to fix 
and test it thoroughly : )

> Exception encountered when cancelling a job in yarn per-job mode
> 
>
> Key: FLINK-16626
> URL: https://issues.apache.org/jira/browse/FLINK-16626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Assignee: Weike Dong
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
> Attachments: jobmanager.log, patch.diff
>
>
> When executing the following command to stop a flink job with yarn per-job 
> mode, the client keeps retrying untill timeout (1 minutes)and exit with 
> failure. But the job stops successfully.
>  Command :
> {noformat}
> flink cancel $jobId yid appId
> {noformat}
>  The exception on the client side is :
> {quote}bq. 
> 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> ...
> 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Shutting down rest endpoint.
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,077 DEBUG 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
> thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
> 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
> endpoint shutdown complete.
> 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 

[jira] [Commented] (FLINK-16626) Exception encountered when cancelling a job in yarn per-job mode

2020-03-31 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072311#comment-17072311
 ] 

Weike Dong commented on FLINK-16626:


Hi [~tison], I am quite interested in solving this issue and have been 
following this for a while, so I guess I can help fix this bug : )

> Exception encountered when cancelling a job in yarn per-job mode
> 
>
> Key: FLINK-16626
> URL: https://issues.apache.org/jira/browse/FLINK-16626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
> Attachments: jobmanager.log, patch.diff
>
>
> When executing the following command to stop a flink job with yarn per-job 
> mode, the client keeps retrying untill timeout (1 minutes)and exit with 
> failure. But the job stops successfully.
>  Command :
> {noformat}
> flink cancel $jobId yid appId
> {noformat}
>  The exception on the client side is :
> {quote}bq. 
> 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> ...
> 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Shutting down rest endpoint.
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,077 DEBUG 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
> thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
> 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
> endpoint shutdown complete.
> 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> 

[jira] [Commented] (FLINK-16708) When a JDBC connection has been closed, the retry policy of the JDBCUpsertOutputFormat cannot take effect and may result in data loss

2020-03-23 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17064580#comment-17064580
 ] 

Weike Dong commented on FLINK-16708:


Hi [~lzljs3620320] , do you have time to look into this issue? Thanks 

> When a JDBC connection has been closed, the retry policy of the 
> JDBCUpsertOutputFormat cannot take effect and may result in data loss
> -
>
> Key: FLINK-16708
> URL: https://issues.apache.org/jira/browse/FLINK-16708
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: tangshangwen
>Assignee: tangshangwen
>Priority: Major
>
> In our test environment, I used the tcpkill command to simulate a scenario 
> where the postgresql connection was closed. I found that the retry strategy 
> of the flush method did not take effect, and when it retried the second time, 
> it could not recognize that the connection had been closed because Before the 
> first check whether the connection is closed, the batchStatements of 
> PgStatement have been cleared, which causes the second execution to check 
> that the batchStatements are empty and return normally.
> {code:java}
> 2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch 
> error, retry times = 1
>  org.postgresql.util.PSQLException: This connection has been closed.
>  at 
> org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
>  at 
> org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
>  at 
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
>  at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
>  at 
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
>  at 
> org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
>  at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
>  at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch 
> error, retry times = 1
>  org.postgresql.util.PSQLException: This connection has been closed.
>  at 
> org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
>  at 
> org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
>  at 
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
>  at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
>  at 
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
>  at 
> org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
>  at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
>  at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7129) Support dynamically changing CEP patterns

2018-10-12 Thread Weike Dong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647591#comment-16647591
 ] 

Weike Dong commented on FLINK-7129:
---

I am also curious about the current progress on this issue : )

> Support dynamically changing CEP patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9915) Add TO_BASE64 function for table/sql API

2018-07-23 Thread Weike Dong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552446#comment-16552446
 ] 

Weike Dong commented on FLINK-9915:
---

Hi vino,

I strongly support this feature, as the built-in functions in Flink Table / SQL 
are relatively small in number, lacking in many useful transformation functions 
for strings. However, many users with experiences of Hive SQL would prefer the 
Hive SQL style function name like "BASE64" or "UNBASE64", therefore the naming 
of the function could be discussed. Also for the type of the parameter for 
TO_BASE64, whether a binary array could be accepted or only a string is 
acceptable is another point to be considered.

But anyway, +1 for the feature : )

> Add TO_BASE64 function for table/sql API
> 
>
> Key: FLINK-9915
> URL: https://issues.apache.org/jira/browse/FLINK-9915
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> refer to mysql TO_BASE64 function : 
> https://dev.mysql.com/doc/refman/5.6/en/string-functions.html#function_to-base64



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8353) Add support for timezones

2018-05-29 Thread Weike Dong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493639#comment-16493639
 ] 

Weike Dong commented on FLINK-8353:
---

I strongly support these features, preferably there could be a way to set a 
specific timezone for a particular job, so that all the subsequent temporal 
processing could be based on that. As user's input data are often collected 
from other systems that do not follow the rules set by Flink (UTC+0), currently 
some temporal UDFs are needed to perform such transformations, which adds the 
complexity for the whole system, especially in case of watermark generation or 
output of processing time into external database, etc.

> Add support for timezones
> -
>
> Key: FLINK-8353
> URL: https://issues.apache.org/jira/browse/FLINK-8353
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> This is an umbrella issue for adding support for timezones in the Table & SQL 
> API.
> Usually companies work with different timezones simultaneously. We could add 
> support for the new time classes introduced with Java 8 and enable our scalar 
> functions to also work with those (or some custom time class implementations 
> like those from Calcite). We need a good design for this to address most of 
> the problems users face related to timestamp and timezones.
> It is up for discussion how to ship date, time, timestamp instances through 
> the cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)