[jira] [Assigned] (FLINK-10506) Introduce minimum, target and maximum parallelism to JobGraph

2018-10-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-10506:


Assignee: Gary Yao

> Introduce minimum, target and maximum parallelism to JobGraph
> -
>
> Key: FLINK-10506
> URL: https://issues.apache.org/jira/browse/FLINK-10506
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to run a job with a variable parallelism, one needs to be able to 
> define the minimum and maximum parallelism for an operator as well as the 
> current target value. In the first implementation, minimum could be 1 and 
> maximum the max parallelism of the operator if no explicit parallelism has 
> been specified for an operator. If a parallelism p has been specified (via 
> setParallelism(p)), then minimum = maximum = p. The target value could be the 
> command line parameter -p or the default parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-10-11 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647542#comment-16647542
 ] 

Bowen Li commented on FLINK-10168:
--

Well, the params of {{readFile()}} do feel a bit crowded though. Hopefully at 
some point, we can abstract all params into a configuration, the same style as 
other config-based sources

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path. What 
> this means is that, currently the API is 
> {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path 
> filter. A more generic API that can take more filters can look like this 1) 
> {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... 
> ))}}
> 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} 
> exposes all file attributes that Flink's file system can provide, like path 
> and modified time
> I lean towards the 2nd option, because it gives users more flexibility to 
> define complex filtering rules based on combinations of file attributes.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10533) job parallelism equals task slots number but not use the same number of the task slots as the parallelism

2018-10-11 Thread sean.miao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sean.miao updated FLINK-10533:
--
Description: 
i use the table api and do not use the datastream api。

 

my job has two graph and every parallelism is two.so the total parallelism is 
four;

but if give my job four slots but it just use two slots.

 

!image-2018-10-12-10-36-13-503.png!

!image-2018-10-12-10-35-57-443.png!

 

thanks.

  was:
i use the table api and do not use the datastream api。

 

my job has two graph and every parallelism is two.

if give my job four slots but it just use two slots.

 

!image-2018-10-12-10-36-13-503.png!

!image-2018-10-12-10-35-57-443.png!

 

thanks.


> job parallelism equals task slots number but not use the same number of the 
> task slots as the parallelism
> -
>
> Key: FLINK-10533
> URL: https://issues.apache.org/jira/browse/FLINK-10533
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.3, 1.6.0, 1.6.1
>Reporter: sean.miao
>Priority: Major
> Attachments: image-2018-10-12-10-35-57-443.png, 
> image-2018-10-12-10-36-13-503.png
>
>
> i use the table api and do not use the datastream api。
>  
> my job has two graph and every parallelism is two.so the total parallelism is 
> four;
> but if give my job four slots but it just use two slots.
>  
> !image-2018-10-12-10-36-13-503.png!
> !image-2018-10-12-10-35-57-443.png!
>  
> thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10533) job parallelism equals task slots number but not use the same number of the task slots as the parallelism

2018-10-11 Thread sean.miao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sean.miao updated FLINK-10533:
--
Summary: job parallelism equals task slots number but not use the same 
number of the task slots as the parallelism  (was: job parallelism equals task 
slots number but not use all task slot)

> job parallelism equals task slots number but not use the same number of the 
> task slots as the parallelism
> -
>
> Key: FLINK-10533
> URL: https://issues.apache.org/jira/browse/FLINK-10533
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.3, 1.6.0, 1.6.1
>Reporter: sean.miao
>Priority: Major
> Attachments: image-2018-10-12-10-35-57-443.png, 
> image-2018-10-12-10-36-13-503.png
>
>
> i use the table api and do not use the datastream api。
>  
> my job has two graph and every parallelism is two.
> if give my job four slots but it just use two slots.
>  
> !image-2018-10-12-10-36-13-503.png!
> !image-2018-10-12-10-35-57-443.png!
>  
> thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10533) job parallelism equals task slots number but not use all task slot

2018-10-11 Thread sean.miao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sean.miao updated FLINK-10533:
--
Summary: job parallelism equals task slots number but not use all task slot 
 (was: job parallelism equals task slots number but not use all tasl slot)

> job parallelism equals task slots number but not use all task slot
> --
>
> Key: FLINK-10533
> URL: https://issues.apache.org/jira/browse/FLINK-10533
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.3, 1.6.0, 1.6.1
>Reporter: sean.miao
>Priority: Major
> Attachments: image-2018-10-12-10-35-57-443.png, 
> image-2018-10-12-10-36-13-503.png
>
>
> i use the table api and do not use the datastream api。
>  
> my job has two graph and every parallelism is two.
> if give my job four slots but it just use two slots.
>  
> !image-2018-10-12-10-36-13-503.png!
> !image-2018-10-12-10-35-57-443.png!
>  
> thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10533) job parallelism equals task slots number but not use all tasl slot

2018-10-11 Thread sean.miao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sean.miao updated FLINK-10533:
--
Summary: job parallelism equals task slots number but not use all tasl slot 
 (was: job parallelism equals task slot number but not use all tasl slot)

> job parallelism equals task slots number but not use all tasl slot
> --
>
> Key: FLINK-10533
> URL: https://issues.apache.org/jira/browse/FLINK-10533
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.3, 1.6.0, 1.6.1
>Reporter: sean.miao
>Priority: Major
> Attachments: image-2018-10-12-10-35-57-443.png, 
> image-2018-10-12-10-36-13-503.png
>
>
> i use the table api and do not use the datastream api。
>  
> my job has two graph and every parallelism is two.
> if give my job four slots but it just use two slots.
>  
> !image-2018-10-12-10-36-13-503.png!
> !image-2018-10-12-10-35-57-443.png!
>  
> thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10533) job parallelism equals task slot number but not use all tasl slot

2018-10-11 Thread sean.miao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sean.miao updated FLINK-10533:
--
Affects Version/s: 1.5.3
   1.6.0

> job parallelism equals task slot number but not use all tasl slot
> -
>
> Key: FLINK-10533
> URL: https://issues.apache.org/jira/browse/FLINK-10533
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.3, 1.6.0, 1.6.1
>Reporter: sean.miao
>Priority: Major
> Attachments: image-2018-10-12-10-35-57-443.png, 
> image-2018-10-12-10-36-13-503.png
>
>
> i use the table api and do not use the datastream api。
>  
> my job has two graph and every parallelism is two.
> if give my job four slots but it just use two slots.
>  
> !image-2018-10-12-10-36-13-503.png!
> !image-2018-10-12-10-35-57-443.png!
>  
> thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-10-11 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647534#comment-16647534
 ] 

Bowen Li commented on FLINK-10168:
--

On a second thought to extend my comment on making this function similar to 
message queues' start position function, I feel defining it as a filter API 
would make it actually more complex than user anticipation. 

How about just providing a {{startPosition}} param to {{readFile() API}} to 
make it as simple as those in Flink's Kinesis/Kafka consumers? [~kkl0u] 
[~fhueske]

This way, it also might be even easier for Flink's SQL to parse when we need to 
add SQL file sources in the future.

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path. What 
> this means is that, currently the API is 
> {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path 
> filter. A more generic API that can take more filters can look like this 1) 
> {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... 
> ))}}
> 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} 
> exposes all file attributes that Flink's file system can provide, like path 
> and modified time
> I lean towards the 2nd option, because it gives users more flexibility to 
> define complex filtering rules based on combinations of file attributes.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-10-11 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635923#comment-16635923
 ] 

Bowen Li edited comment on FLINK-10168 at 10/12/18 6:05 AM:


[~kkl0u] what do you think about this task? 

I believe this is very important, this provides users with similar functions in 
streaming (kinesis, kafka) where users specify a start position in streams to 
read data from a certain point of time. 


was (Author: phoenixjiangnan):
[~kkl0u] what do you think about this task? 

I believe this is very important, this provides users with similar functions in 
streaming (kinesis, kafka) where users can read data from a certain point of 
time. 

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path. What 
> this means is that, currently the API is 
> {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path 
> filter. A more generic API that can take more filters can look like this 1) 
> {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... 
> ))}}
> 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} 
> exposes all file attributes that Flink's file system can provide, like path 
> and modified time
> I lean towards the 2nd option, because it gives users more flexibility to 
> define complex filtering rules based on combinations of file attributes.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-10-11 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-10168:


Assignee: Bowen Li  (was: Jiayi Liao)

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path. What 
> this means is that, currently the API is 
> {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path 
> filter. A more generic API that can take more filters can look like this 1) 
> {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... 
> ))}}
> 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} 
> exposes all file attributes that Flink's file system can provide, like path 
> and modified time
> I lean towards the 2nd option, because it gives users more flexibility to 
> define complex filtering rules based on combinations of file attributes.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread ouyangzhe (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647499#comment-16647499
 ] 

ouyangzhe commented on FLINK-10534:
---

[~yanghua]  thanks :) .

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647462#comment-16647462
 ] 

vinoyang commented on FLINK-10534:
--

[~yeshan] I believe he will see and give you permission, don't worry too much, 
Germany and China have jet lag. They have not yet started working.

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread ouyangzhe (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647381#comment-16647381
 ] 

ouyangzhe commented on FLINK-10534:
---

Hi~ [~till.rohrmann] , i'd like to contribute, can you give me the authority to 
assign Jira to myself? 

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647379#comment-16647379
 ] 

vinoyang commented on FLINK-10534:
--

[~yeshan] It seems that you currently have no JIRA contribute permission? I am 
happy that Ping [~till.rohrmann] or [~Zentol] assign you permissions.

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10534:


Assignee: (was: vinoyang)

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread ouyangzhe (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647376#comment-16647376
 ] 

ouyangzhe commented on FLINK-10534:
---

hi~ [~yanghua] , for this issue, I have a prototype already, can you help to 
assign it to me to work on the feature? ( I have no authority to assign to 
myself)

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Assignee: vinoyang
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10508) Port JobManagerITCase to new code base

2018-10-11 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647375#comment-16647375
 ] 

TisonKun commented on FLINK-10508:
--

checklist:

JobManagerITCase.The JobManager actor must handle jobs when not enough slots
JobManagerITCase.The JobManager actor must support immediate scheduling of a 
single vertex
JobManagerITCase.The JobManager actor must support queued scheduling of a 
single vertex
JobManagerITCase.The JobManager actor must support forward jobs
JobManagerITCase.The JobManager actor must support bipartite job
JobManagerITCase.The JobManager actor must support two input job failing edge 
mismatch
JobManagerITCase.The JobManager actor must support two input job
JobManagerITCase.The JobManager actor must support scheduling all at once
JobManagerITCase.The JobManager actor must handle job with a failing sender 
vertex
JobManagerITCase.The JobManager actor must handle job with an occasionally 
failing sender vertex
JobManagerITCase.The JobManager actor must handle job with a failing receiver 
vertex
JobManagerITCase.The JobManager actor must handle job with all vertices failing 
during instantiation
JobManagerITCase.The JobManager actor must handle job with some vertices 
failing during instantiation
JobManagerITCase.The JobManager actor must check that all job vertices have 
completed the call to finalizeOnMaster before the job completes
JobManagerITCase.The JobManager actor must remove execution graphs when the 
client ends the session explicitly
JobManagerITCase.The JobManager actor must remove execution graphs when when 
the client's session times out
JobManagerITCase.The JobManager actor must handle trigger savepoint response 
for non-existing job
JobManagerITCase.The JobManager actor must handle trigger savepoint response 
for job with disabled checkpointing
JobManagerITCase.The JobManager actor must handle trigger savepoint response 
after trigger savepoint failure
JobManagerITCase.The JobManager actor must handle failed savepoint triggering
JobManagerITCase.The JobManager actor must handle trigger savepoint response 
after succeeded savepoint future

> Port JobManagerITCase to new code base
> --
>
> Key: FLINK-10508
> URL: https://issues.apache.org/jira/browse/FLINK-10508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647373#comment-16647373
 ] 

vinoyang commented on FLINK-10534:
--

[~isunjin] agree. 

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Assignee: vinoyang
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647372#comment-16647372
 ] 

JIN SUN commented on FLINK-10534:
-

session can have the semantic of expire time. we probably can add a parameter 
to to specify the expiry time while create the session (maybe in command line), 
the default value can be infinite to keep current behavior. 

 

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Assignee: vinoyang
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647370#comment-16647370
 ] 

ASF GitHub Bot commented on FLINK-10135:


yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-429191782
 
 
   display here then I will try to trigger Travis rebuild : 
   
   ```
   17:18:24.891 [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project flink-s3-fs-base: Compilation failure: Compilation failure:
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[38,1]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   static S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java:[38,1]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   static S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java:[137,57]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   variable S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class 
org.apache.flink.fs.s3.common.writer.S3RecoverableWriter
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java:[137,72]
 toHadoopPath(org.apache.flink.core.fs.Path) is not public in 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; cannot be accessed from 
outside package
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[234,57]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   variable S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class 
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[252,57]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   variable S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class 
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-11 Thread GitBox
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-429191782
 
 
   display here then I will try to trigger Travis rebuild : 
   
   ```
   17:18:24.891 [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project flink-s3-fs-base: Compilation failure: Compilation failure:
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[38,1]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   static S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java:[38,1]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   static S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java:[137,57]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   variable S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class 
org.apache.flink.fs.s3.common.writer.S3RecoverableWriter
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java:[137,72]
 toHadoopPath(org.apache.flink.core.fs.Path) is not public in 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; cannot be accessed from 
outside package
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[234,57]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   variable S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class 
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream
   17:18:24.891 [ERROR] 
/home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[252,57]
 cannot find symbol
   17:18:24.891 [ERROR] symbol:   variable S3_MULTIPART_MIN_PART_SIZE
   17:18:24.891 [ERROR] location: class 
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647366#comment-16647366
 ] 

ASF GitHub Bot commented on FLINK-10135:


yanghua edited a comment on issue #6702: [FLINK-10135] The JobManager does not 
report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-429191342
 
 
   I saw some compile errors not related to this PR change which caused Travis 
failed  from `flink-s3-fs-hadoop`. cc @tillrohrmann  and @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647365#comment-16647365
 ] 

ASF GitHub Bot commented on FLINK-10135:


yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-429191342
 
 
   I saw some errors not related to this PR change, from `flink-s3-fs-hadoop`. 
cc @tillrohrmann  and @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua edited a comment on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-11 Thread GitBox
yanghua edited a comment on issue #6702: [FLINK-10135] The JobManager does not 
report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-429191342
 
 
   I saw some compile errors not related to this PR change which caused Travis 
failed  from `flink-s3-fs-hadoop`. cc @tillrohrmann  and @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-11 Thread GitBox
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-429191342
 
 
   I saw some errors not related to this PR change, from `flink-s3-fs-hadoop`. 
cc @tillrohrmann  and @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10534:


Assignee: vinoyang

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Assignee: vinoyang
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647352#comment-16647352
 ] 

vinoyang commented on FLINK-10534:
--

Hi [~yeshan] Thank you for making this suggestion, it seems that it is 
reasonable. [~till.rohrmann], what do you think?

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Assignee: vinoyang
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10535) User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai (JIRA)
yinhua.dai created FLINK-10535:
--

 Summary: User jar is present in the flink job manager's class path
 Key: FLINK-10535
 URL: https://issues.apache.org/jira/browse/FLINK-10535
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: yinhua.dai


Run flink on yarn with attach and per-job mode, i.e. 
{code:java}
flink run -m yarn-cluster -yn 1 *** user.jar
{code}
The user.jar will not be present in flink job manager's class path, but as we 
observed that in flink 1.3 it will by default put to the beginning of flink job 
manager's class path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10533) job parallelism equals task slot number but not use all tasl slot

2018-10-11 Thread sean.miao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sean.miao updated FLINK-10533:
--
Description: 
i use the table api and do not use the datastream api。

 

my job has two graph and every parallelism is two.

if give my job four slots but it just use two slots.

 

!image-2018-10-12-10-36-13-503.png!

!image-2018-10-12-10-35-57-443.png!

 

thanks.

  was:
my job has two graph and every parallelism is two.

if give my job four slots but it just use two slots.

 

!image-2018-10-12-10-36-13-503.png!

!image-2018-10-12-10-35-57-443.png!

 

thanks.


> job parallelism equals task slot number but not use all tasl slot
> -
>
> Key: FLINK-10533
> URL: https://issues.apache.org/jira/browse/FLINK-10533
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: sean.miao
>Priority: Major
> Attachments: image-2018-10-12-10-35-57-443.png, 
> image-2018-10-12-10-36-13-503.png
>
>
> i use the table api and do not use the datastream api。
>  
> my job has two graph and every parallelism is two.
> if give my job four slots but it just use two slots.
>  
> !image-2018-10-12-10-36-13-503.png!
> !image-2018-10-12-10-35-57-443.png!
>  
> thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-11 Thread ouyangzhe (JIRA)
ouyangzhe created FLINK-10534:
-

 Summary: Add idle timeout for a flink session cluster
 Key: FLINK-10534
 URL: https://issues.apache.org/jira/browse/FLINK-10534
 Project: Flink
  Issue Type: New Feature
  Components: Cluster Management
Affects Versions: 1.7.0
Reporter: ouyangzhe
 Attachments: 屏幕快照 2018-10-12 上午10.24.08.png

The flink session cluster on yarn will aways be running while has no jobs 
running at all, it will occupy the yarn resources for no use.

Taskmanagers will be released after an idle timeout, but jobmanager will keep 
running.

I propose to add a configuration to limit the idle timeout for jobmanager too, 
if no job running after a specified timeout, the flink cluster auto finish 
itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10533) job parallelism equals task slot number but not use all tasl slot

2018-10-11 Thread sean.miao (JIRA)
sean.miao created FLINK-10533:
-

 Summary: job parallelism equals task slot number but not use all 
tasl slot
 Key: FLINK-10533
 URL: https://issues.apache.org/jira/browse/FLINK-10533
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.6.1
Reporter: sean.miao
 Attachments: image-2018-10-12-10-35-57-443.png, 
image-2018-10-12-10-36-13-503.png

my job has two graph and every parallelism is two.

if give my job four slots but it just use two slots.

 

!image-2018-10-12-10-36-13-503.png!

!image-2018-10-12-10-35-57-443.png!

 

thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647316#comment-16647316
 ] 

ASF GitHub Bot commented on FLINK-10367:


zhijiangW opened a new pull request #6829: [FLINK-10367][network] Introduce 
NotificationResult for BufferListener to solve recursive stack overflow
URL: https://github.com/apache/flink/pull/6829
 
 
   ## What is the purpose of the change
   
   *In the process of `LocalBufferPool#recycle`, the recycled buffer would be 
notified to a `BufferListener`. But this `BufferListener` may not need floating 
buffers any more currently, so this buffer is recycled again to the 
`LocalBufferPool`, then another `BufferListener` is selected to be notified of 
this available buffer.
   
   The above process may be repeatedly triggered in recursive way that will 
cause stack overflow error in extreme case. We ever encountered this error 
triggered by release all resources during task failover in large scale job, 
especially it will also result in buffer leak after stack overflow.*
   
   ## Brief change log
   
 - *Introduce the `NotificationResult` for describing the notification 
result for `BufferListener`*
 - *Adjust the process of `LocalBufferPool#recycle` to avoid recursive call*
 - *Adjust the process of `RemoteInputChannel#notifyBufferAvailable` 
accordingly*
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid recursion stack overflow during releasing SingleInputGate
> ---
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} 
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to 
> release all the input channels, then destroy the {{BufferPool}}.  For 
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers 
> to the {{BufferPool}} {{which assigns this recycled buffer to the other 
> listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already 
> released before, it will directly recycle this buffer to the {{BufferPool}} 
> which takes another listener to notify available buffer. The above process 
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will 
> cause {{StackOverflow}} error because of recursion. And in our testing job, 
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of 
> unregistering this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to 
> release all the internal input channels. To do so, all the listeners in 
> {{BufferPool}} will be removed during destroying, and the input channel will 
> not have further interactions during 
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to 
> expand another interface method for removing buffer listener, further 
> currently the internal data structure in {{BufferPool}} can not support 
> remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate

2018-10-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10367:
---
Labels: pull-request-available  (was: )

> Avoid recursion stack overflow during releasing SingleInputGate
> ---
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} 
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to 
> release all the input channels, then destroy the {{BufferPool}}.  For 
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers 
> to the {{BufferPool}} {{which assigns this recycled buffer to the other 
> listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already 
> released before, it will directly recycle this buffer to the {{BufferPool}} 
> which takes another listener to notify available buffer. The above process 
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will 
> cause {{StackOverflow}} error because of recursion. And in our testing job, 
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of 
> unregistering this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to 
> release all the internal input channels. To do so, all the listeners in 
> {{BufferPool}} will be removed during destroying, and the input channel will 
> not have further interactions during 
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to 
> expand another interface method for removing buffer listener, further 
> currently the internal data structure in {{BufferPool}} can not support 
> remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW opened a new pull request #6829: [FLINK-10367][network] Introduce NotificationResult for BufferListener to solve recursive stack overflow

2018-10-11 Thread GitBox
zhijiangW opened a new pull request #6829: [FLINK-10367][network] Introduce 
NotificationResult for BufferListener to solve recursive stack overflow
URL: https://github.com/apache/flink/pull/6829
 
 
   ## What is the purpose of the change
   
   *In the process of `LocalBufferPool#recycle`, the recycled buffer would be 
notified to a `BufferListener`. But this `BufferListener` may not need floating 
buffers any more currently, so this buffer is recycled again to the 
`LocalBufferPool`, then another `BufferListener` is selected to be notified of 
this available buffer.
   
   The above process may be repeatedly triggered in recursive way that will 
cause stack overflow error in extreme case. We ever encountered this error 
triggered by release all resources during task failover in large scale job, 
especially it will also result in buffer leak after stack overflow.*
   
   ## Brief change log
   
 - *Introduce the `NotificationResult` for describing the notification 
result for `BufferListener`*
 - *Adjust the process of `LocalBufferPool#recycle` to avoid recursive call*
 - *Adjust the process of `RemoteInputChannel#notifyBufferAvailable` 
accordingly*
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9544) Downgrade kinesis protocol from CBOR to JSON not possible as required by kinesalite

2018-10-11 Thread Diego Carvallo (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647280#comment-16647280
 ] 

Diego Carvallo commented on FLINK-9544:
---

[~eyushin] That is not working either in v 1.6.0. I tried several options and 
none of them worked in my local:
{code:java}
env.java.opts: "-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor"
env.java.opts: 
"-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor=true"
env.java.opts: 
"-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor=1"
env.java.opts: "-Dcom.amazonaws.sdk.disableCbor"
env.java.opts: "-Dcom.amazonaws.sdk.disableCbor=true"
env.java.opts: "-Dcom.amazonaws.sdk.disableCbor=1"
{code}
 I think it may be related to what's explained in this thread: 
[https://stackoverflow.com/questions/42344624/apache-flink-custom-java-options-are-not-recognized-inside-job]

> Downgrade kinesis protocol from CBOR to JSON not possible as required by 
> kinesalite
> ---
>
> Key: FLINK-9544
> URL: https://issues.apache.org/jira/browse/FLINK-9544
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2
>Reporter: Ph.Duveau
>Priority: Critical
>
> The amazon client do not downgrade from CBOR to JSON while setting env 
> AWS_CBOR_DISABLE to true (or 1) and/or defining 
> com.amazonaws.sdk.disableCbor=true via JVM options. This bug is due to maven 
> shade relocation of com.amazon.* classes. As soon as you cancel this 
> relocation (by removing the relocation in the kinesis connector or by 
> re-relocating in the final jar), it reruns again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10265) Configure checkpointing behavior for SQL Client

2018-10-11 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647218#comment-16647218
 ] 

sunjincheng commented on FLINK-10265:
-

Hi,[~twalthr] [~yanghua] I agree that we should unify the batch and streaming.  
BTW we should add the checkpoint timeout attribute configuration as well. 
something like :
{code:java}
...
checkpointing:
   timeout: 60
   ...{code}

> Configure checkpointing behavior for SQL Client
> ---
>
> Key: FLINK-10265
> URL: https://issues.apache.org/jira/browse/FLINK-10265
> Project: Flink
>  Issue Type: New Feature
>  Components: SQL Client
>Reporter: Timo Walther
>Assignee: vinoyang
>Priority: Major
>
> The SQL Client environment file should expose checkpointing related 
> properties:
> - enable checkpointing
> - checkpointing interval
> - mode
> - timeout
> - etc. see {{org.apache.flink.streaming.api.environment.CheckpointConfig}}
> Per-job selection of state backends and their configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647203#comment-16647203
 ] 

ASF GitHub Bot commented on FLINK-10516:


suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#discussion_r224639273
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 ##
 @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
}
+
+   @Test
+   public void testRunAndInitializeFileSystem() throws Exception {
+   // Mock necessary system variables
+   Map map = new HashMap(System.getenv());
+   map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
+   // Create dynamic properties to be used in the Flink 
configuration
+   map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue");
+   CommonTestUtils.setEnv(map);
+
+   // Create a temporary flink-conf.yaml and to be deleted on JVM 
exits
+   File currDir = new 
File(System.getenv().get(ApplicationConstants.Environment.PWD.key()));
+   String path = String.format("%s/%s.%s", currDir, "flink-conf", 
"yaml");
+   File f = new File(path);
+   f.createNewFile();
+   f.deleteOnExit();
+
+   // Mock FileSystem.initialize()
+   PowerMockito.mockStatic(FileSystem.class);
+   PowerMockito.doNothing().when(FileSystem.class);
+   FileSystem.initialize(any(Configuration.class));
+
+   String[] args = new String[5];
+   YarnApplicationMasterRunner yarnApplicationMasterRunner = new 
YarnApplicationMasterRunner();
+   yarnApplicationMasterRunner.run(args);
 
 Review comment:
   I think we should refactor the code like YarnTaskManagerRunnerFactoryTest. 
Running the entire appMaster code in unittest is too much.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread GitBox
suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#discussion_r224639273
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 ##
 @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
}
+
+   @Test
+   public void testRunAndInitializeFileSystem() throws Exception {
+   // Mock necessary system variables
+   Map map = new HashMap(System.getenv());
+   map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
+   // Create dynamic properties to be used in the Flink 
configuration
+   map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue");
+   CommonTestUtils.setEnv(map);
+
+   // Create a temporary flink-conf.yaml and to be deleted on JVM 
exits
+   File currDir = new 
File(System.getenv().get(ApplicationConstants.Environment.PWD.key()));
+   String path = String.format("%s/%s.%s", currDir, "flink-conf", 
"yaml");
+   File f = new File(path);
+   f.createNewFile();
+   f.deleteOnExit();
+
+   // Mock FileSystem.initialize()
+   PowerMockito.mockStatic(FileSystem.class);
+   PowerMockito.doNothing().when(FileSystem.class);
+   FileSystem.initialize(any(Configuration.class));
+
+   String[] args = new String[5];
+   YarnApplicationMasterRunner yarnApplicationMasterRunner = new 
YarnApplicationMasterRunner();
+   yarnApplicationMasterRunner.run(args);
 
 Review comment:
   I think we should refactor the code like YarnTaskManagerRunnerFactoryTest. 
Running the entire appMaster code in unittest is too much.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647200#comment-16647200
 ] 

ASF GitHub Bot commented on FLINK-10516:


suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#discussion_r224638155
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 ##
 @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
}
+
+   @Test
+   public void testRunAndInitializeFileSystem() throws Exception {
+   // Mock necessary system variables
+   Map map = new HashMap(System.getenv());
+   map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
+   // Create dynamic properties to be used in the Flink 
configuration
+   map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue");
+   CommonTestUtils.setEnv(map);
+
+   // Create a temporary flink-conf.yaml and to be deleted on JVM 
exits
+   File currDir = new 
File(System.getenv().get(ApplicationConstants.Environment.PWD.key()));
 
 Review comment:
   There is already a flink-conf.yaml in the test resource directory. You can 
reuse it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread GitBox
suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#discussion_r224638155
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 ##
 @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
}
+
+   @Test
+   public void testRunAndInitializeFileSystem() throws Exception {
+   // Mock necessary system variables
+   Map map = new HashMap(System.getenv());
+   map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
+   // Create dynamic properties to be used in the Flink 
configuration
+   map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue");
+   CommonTestUtils.setEnv(map);
+
+   // Create a temporary flink-conf.yaml and to be deleted on JVM 
exits
+   File currDir = new 
File(System.getenv().get(ApplicationConstants.Environment.PWD.key()));
 
 Review comment:
   There is already a flink-conf.yaml in the test resource directory. You can 
reuse it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647196#comment-16647196
 ] 

ASF GitHub Bot commented on FLINK-10516:


suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#issuecomment-429158706
 
 
   Can you write the fix against the current master? Also, please fix the test 
failures.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread GitBox
suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#issuecomment-429158706
 
 
   Can you write the fix against the current master? Also, please fix the test 
failures.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647154#comment-16647154
 ] 

ASF GitHub Bot commented on FLINK-5465:
---

zorro786 commented on issue #5058: [FLINK-5465] [streaming] Wait for pending 
timer threads to finish or …
URL: https://github.com/apache/flink/pull/5058#issuecomment-429150903
 
 
   @StefanRRichter It is mentioned in Jira ticket that this affects versions 
1.5.0, 1.4.2, 1.6.0.
   Could you please tell why it doesn't affect 1.4.0?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RocksDB fails with segfault while calling AbstractRocksDBState.clear()
> --
>
> Key: FLINK-5465
> URL: https://issues.apache.org/jira/browse/FLINK-5465
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.4.0, 1.5.0
>
> Attachments: hs-err-pid26662.log
>
>
> I'm using Flink 699f4b0.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576
> #
> # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 
> 1.7.0_67-b01)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # C  [librocksdbjni-linux64.so+0x1aeb78]  
> rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # 
> /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log
> Compiled method (nm) 1869778  903 n   org.rocksdb.RocksDB::remove 
> (native)
>  total in heap  [0x7f91b40b9dd0,0x7f91b40ba150] = 896
>  relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88
>  main code  [0x7f91b40b9f60,0x7f91b40ba150] = 496
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.sun.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zorro786 commented on issue #5058: [FLINK-5465] [streaming] Wait for pending timer threads to finish or …

2018-10-11 Thread GitBox
zorro786 commented on issue #5058: [FLINK-5465] [streaming] Wait for pending 
timer threads to finish or …
URL: https://github.com/apache/flink/pull/5058#issuecomment-429150903
 
 
   @StefanRRichter It is mentioned in Jira ticket that this affects versions 
1.5.0, 1.4.2, 1.6.0.
   Could you please tell why it doesn't affect 1.4.0?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10494) Rename 'JobManager' to 'JobMaster' for some classes in JobMaster folder

2018-10-11 Thread JIN SUN (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JIN SUN closed FLINK-10494.
---
Resolution: Won't Fix

closed as suggestion

> Rename 'JobManager' to 'JobMaster' for some classes in JobMaster folder
> ---
>
> Key: FLINK-10494
> URL: https://issues.apache.org/jira/browse/FLINK-10494
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.0, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Some names in 
> "flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster" folder are 
> confusing, we should rename it to JobMaster.
>  
>  * JobManagerRunner -> JobMasterRunner
>  * JobManagerGateway -> JobMasterGateway
>  * JobManagerSharedServices -> JobMasterSharedServices
>  * JobManagerException -> JobMasterException



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8424) [Cassandra Connector] Update Cassandra version to last one

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647005#comment-16647005
 ] 

ASF GitHub Bot commented on FLINK-8424:
---

bmeriaux commented on issue #6715: [FLINK-8424] update cassandra and driver 
version to latest
URL: https://github.com/apache/flink/pull/6715#issuecomment-429109568
 
 
   ok thanks for info on the policy of versioning, so will this be merged into 
master ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [Cassandra Connector] Update Cassandra version to last one
> --
>
> Key: FLINK-8424
> URL: https://issues.apache.org/jira/browse/FLINK-8424
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Joao Boto
>Priority: Major
>  Labels: pull-request-available
>
> Cassandra connector are using a version outdated
> This is to upgrade the cassandra version to something new
> https://git1-us-west.apache.org/repos/asf?p=cassandra.git;a=blob_plain;f=CHANGES.txt;hb=refs/tags/cassandra-3.11.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bmeriaux commented on issue #6715: [FLINK-8424] update cassandra and driver version to latest

2018-10-11 Thread GitBox
bmeriaux commented on issue #6715: [FLINK-8424] update cassandra and driver 
version to latest
URL: https://github.com/apache/flink/pull/6715#issuecomment-429109568
 
 
   ok thanks for info on the policy of versioning, so will this be merged into 
master ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10532) Broken links in documentation

2018-10-11 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10532:


 Summary: Broken links in documentation
 Key: FLINK-10532
 URL: https://issues.apache.org/jira/browse/FLINK-10532
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
 Fix For: 1.7.0


https://travis-ci.org/apache/flink/builds/440115490#L599
{code:java}
http://localhost:4000/dev/stream/operators.html:
Remote file does not exist -- broken link!!!
--
http://localhost:4000/dev/table/streaming/sql.html:
Remote file does not exist -- broken link!!!
http://localhost:4000/dev/table/streaming.html:
Remote file does not exist -- broken link!!!{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9943) Support TaskManagerMetricQueryServicePaths msg in JobManager Actor

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646923#comment-16646923
 ] 

ASF GitHub Bot commented on FLINK-9943:
---

isunjin commented on issue #6429: [FLINK-9943] Support 
TaskManagerMetricQueryServicePaths msg in JobManager Actor
URL: https://github.com/apache/flink/pull/6429#issuecomment-429081228
 
 
   could you also close the JIRA? @chuanlei 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support TaskManagerMetricQueryServicePaths msg in JobManager Actor
> --
>
> Key: FLINK-9943
> URL: https://issues.apache.org/jira/browse/FLINK-9943
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Chuanlei Ni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The reasons are as follows
>  #  AkkaJobManagerGateway wraps jm actor ref to support such functionality by 
> request RegisteredTaskManagers firstly and request task manager actor to get 
> metric query service path one by one. the procedure above is resource-wasted. 
> It will be more efficient if  we support this functionality in the jm actor
>  # we can expose flink metric system directly to external system (such as 
> flink client and the like) to support more features in future. For now, 
> metric system has been exposed partially because Instance can not (and should 
> not) be transfered remotely. This feature will make metrics exposure 
> consistent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin commented on issue #6429: [FLINK-9943] Support TaskManagerMetricQueryServicePaths msg in JobManager Actor

2018-10-11 Thread GitBox
isunjin commented on issue #6429: [FLINK-9943] Support 
TaskManagerMetricQueryServicePaths msg in JobManager Actor
URL: https://github.com/apache/flink/pull/6429#issuecomment-429081228
 
 
   could you also close the JIRA? @chuanlei 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10516:
---
Labels: pull-request-available  (was: )

> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646859#comment-16646859
 ] 

ASF GitHub Bot commented on FLINK-10516:


yanyan300300 opened a new pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828
 
 
   ## What is the purpose of the change
   
   This pull request makes YarnApplicationMasterRunner explicitly initialize 
FileSystem with Flink Configuration, so that the HadoopFileSystem can take in 
the custom HDFS configuration (e.g. core-site.xml under fs.hdfs.hadoopconf) for 
Flink 1.4
   
   ## Verifying this change
   This change added tests and can be verified as follows:
 - Added a unit tests to verify the FileSystem.initialize() is called with 
correct Flink Config when YarnApplicationMasterRunner.run() is called
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanyan300300 opened a new pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-11 Thread GitBox
yanyan300300 opened a new pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828
 
 
   ## What is the purpose of the change
   
   This pull request makes YarnApplicationMasterRunner explicitly initialize 
FileSystem with Flink Configuration, so that the HadoopFileSystem can take in 
the custom HDFS configuration (e.g. core-site.xml under fs.hdfs.hadoopconf) for 
Flink 1.4
   
   ## Verifying this change
   This change added tests and can be verified as follows:
 - Added a unit tests to verify the FileSystem.initialize() is called with 
correct Flink Config when YarnApplicationMasterRunner.run() is called
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646800#comment-16646800
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224531500
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+  

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646802#comment-16646802
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224529147
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -606,6 +627,17 @@ private RocksDB openDB(
Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been 
created");
 
+   if (this.metricOptions.isEnabled()) {
+   this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+   dbRef,
+   rocksDBResourceGuard,
+   metricOptions,
+   operatorMetricGroup
+   );
+
+   
this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor);
 
 Review comment:
   I think this is not a good way for a clean shutdown. Instead I would suggest 
to close this in the `dispose()` method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646801#comment-16646801
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224534588
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+  

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646803#comment-16646803
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224533004
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
 
 Review comment:
   I think a lease is not required if this object is closed in the beginning 
`RocksDBKeyedStateBackend.dispose()`. The lease is rather required for parallel 
threads that use the db and we don't know when they are done using the db in 
`dispose()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646804#comment-16646804
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224536940
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1400,7 +1433,8 @@ private ColumnFamilyHandle createColumnFamily(String 
stateName) {
ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(nameBytes, columnOptions);
 
try {
-   return db.createColumnFamily(columnDescriptor);
+   ColumnFamilyHandle handle = 
db.createColumnFamily(columnDescriptor);
+   return handle;
 
 Review comment:
   Unrelated and could be reverted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-11 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224531500
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-11 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224533004
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
 
 Review comment:
   I think a lease is not required if this object is closed in the beginning 
`RocksDBKeyedStateBackend.dispose()`. The lease is rather required for parallel 
threads that use the db and we don't know when they are done using the db in 
`dispose()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-11 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224536940
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1400,7 +1433,8 @@ private ColumnFamilyHandle createColumnFamily(String 
stateName) {
ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(nameBytes, columnOptions);
 
try {
-   return db.createColumnFamily(columnDescriptor);
+   ColumnFamilyHandle handle = 
db.createColumnFamily(columnDescriptor);
+   return handle;
 
 Review comment:
   Unrelated and could be reverted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-11 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224534588
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-11 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224529147
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -606,6 +627,17 @@ private RocksDB openDB(
Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been 
created");
 
+   if (this.metricOptions.isEnabled()) {
+   this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+   dbRef,
+   rocksDBResourceGuard,
+   metricOptions,
+   operatorMetricGroup
+   );
+
+   
this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor);
 
 Review comment:
   I think this is not a good way for a clean shutdown. Instead I would suggest 
to close this in the `dispose()` method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10531) State TTL RocksDb backend end-to-end test end-to-end test failed on Travis

2018-10-11 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10531:
-

 Summary: State TTL RocksDb backend end-to-end test end-to-end test 
failed on Travis
 Key: FLINK-10531
 URL: https://issues.apache.org/jira/browse/FLINK-10531
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.1
Reporter: Till Rohrmann


The {{State TTL RocksDb backend end-to-end test}} end-to-end test failed on 
Travis.

https://travis-ci.org/apache/flink/jobs/438226190
https://api.travis-ci.org/v3/job/438226190/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6825: [hotfix] Fix typo in WindowedStream.

2018-10-11 Thread GitBox
asfgit closed pull request #6825: [hotfix] Fix typo in WindowedStream.
URL: https://github.com/apache/flink/pull/6825
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 871d86fc228..4f8243ccc3a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1011,7 +1011,7 @@ public WindowedStream(KeyedStream input,
 * evaluation of the window for each key individually. The output of 
the window function is
 * interpreted as a regular non-windowed stream.
 *
-* Not that this function requires that all data in the windows is 
buffered until the window
+* Note that this function requires that all data in the windows is 
buffered until the window
 * is evaluated, as the function provides no means of incremental 
aggregation.
 *
 * @param function The window function.
@@ -1045,7 +1045,7 @@ public WindowedStream(KeyedStream input,
 * evaluation of the window for each key individually. The output of 
the window function is
 * interpreted as a regular non-windowed stream.
 *
-* Not that this function requires that all data in the windows is 
buffered until the window
+* Note that this function requires that all data in the windows is 
buffered until the window
 * is evaluated, as the function provides no means of incremental 
aggregation.
 *
 * @param function The window function.
@@ -1063,7 +1063,7 @@ public WindowedStream(KeyedStream input,
 * evaluation of the window for each key individually. The output of 
the window function is
 * interpreted as a regular non-windowed stream.
 *
-* Not that this function requires that all data in the windows is 
buffered until the window
+* Note that this function requires that all data in the windows is 
buffered until the window
 * is evaluated, as the function provides no means of incremental 
aggregation.
 *
 * @param function The window function.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on issue #6825: [hotfix] Fix typo in WindowedStream.

2018-10-11 Thread GitBox
StefanRRichter commented on issue #6825: [hotfix] Fix typo in WindowedStream.
URL: https://github.com/apache/flink/pull/6825#issuecomment-429019467
 
 
   LGTM 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-11 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646693#comment-16646693
 ] 

Kostas Kloudas commented on FLINK-9592:
---

Hi [~kent2171]. Sorry for not posting the design doc earlier. 
That was a big mistake on my side.

Currently we have a new filesystem sink, called `StreamingFileSink`.
You can find it on the master branch.

I will have a look at the PR just to get an idea of the proposed solution but
please open a discussion in the dev mailing list so that other members of 
the community can comment on it.

In the discussion you can include the PR and why you think that  this feature 
is 
interesting and how you implemented it. 

This is the only way that this PR can be merged, as described in the 
contribution 
guidelines https://flink.apache.org/how-to-contribute.html. In addition you can 
follow the discussion about the contributions in the dev ML (search for 
[DISCUSS] [Contributing]).

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code Bucketing

[jira] [Commented] (FLINK-6625) Flink removes HA job data when reaching JobStatus.FAILED

2018-10-11 Thread Ufuk Celebi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646645#comment-16646645
 ] 

Ufuk Celebi commented on FLINK-6625:


{quote}Can't a failed checkpoint (unable to commit something, write somewhere, 
etc.) fail the job? Such an incomplete checkpoint would make HA-recovery 
impossible.{quote}

I think not. The scenario you describe would result in a {{FAILED}} checkpoint 
and the previous checkpoint would be the latest one in the HA store. 

> Flink removes HA job data when reaching JobStatus.FAILED
> 
>
> Key: FLINK-6625
> URL: https://issues.apache.org/jira/browse/FLINK-6625
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Till Rohrmann
>Priority: Major
>
> Currently, Flink removes all job related data (submitted {{JobGraph}} as well 
> as checkpoints) when it reaches a globally terminal state (including 
> {{JobStatus.FAILED}}). In high availability mode, this entails that all data 
> is removed from ZooKeeper and there is no way to recover the job by 
> restarting the cluster with the same cluster id.
> I think this is problematic, since an application might just have failed 
> because it has depleted its numbers of restart attempts. Also the last 
> checkpoint information could be helpful when trying to find out why the job 
> has actually failed. I propose that we only remove job data when reaching the 
> state {{JobStatus.SUCCESS}} or {{JobStatus.CANCELED}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10529) Add flink-s3-fs-base to the connectors in the travis stage file.

2018-10-11 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-10529.
--
Resolution: Fixed

Merged on master with 86f88f71253c8fc916de0dae1c0621072b0321c0.

> Add flink-s3-fs-base to the connectors in the travis stage file.
> 
>
> Key: FLINK-10529
> URL: https://issues.apache.org/jira/browse/FLINK-10529
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9752) Add an S3 RecoverableWriter

2018-10-11 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-9752.
-
Resolution: Fixed

Merged on master with 88ea996dda16e340488a2d9f7516bf8337edefdd.

> Add an S3 RecoverableWriter
> ---
>
> Key: FLINK-9752
> URL: https://issues.apache.org/jira/browse/FLINK-9752
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> S3 offers persistence only when uploads are complete. That means at the end 
> of simple uploads and uploads of parts of a MultiPartUpload.
> We should implement a RecoverableWriter for S3 that does a MultiPartUpload 
> with a Part per checkpoint.
> Recovering the reader needs the MultiPartUploadID and the list of ETags of 
> previous parts.
> We need additional staging of data in Flink state to work around the fact that
>  - Parts in a MultiPartUpload must be at least 5MB
>  - Part sizes must be known up front. (Note that data can still be streamed 
> in the upload)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646603#comment-16646603
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224492754
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Row

[GitHub] hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time

2018-10-11 Thread GitBox
hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224492754
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we need efficient 
deletes of the oldest rows.
+*
+* TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's 
timestamp, to avoid
+* full map traversals (if we have lots of rows on the state that exceed 
`currentWatermark`).

[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646580#comment-16646580
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224486164
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnFailure() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.completeExceptionally(new RuntimeException());
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+   }
+
+   @Test(timeout = 5000)
+   public void testTimeoutExceptionOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(0, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null)));
 
+   try {
+   casSinkFunc.invoke("hello");
+   Assert.fail("Sending value should have experienced a 
TimeoutException");
+   } catc

[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646574#comment-16646574
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224475206
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ##
 @@ -354,6 +359,23 @@ protected Cluster buildCluster(Cluster.Builder builder) {
return this;
}
 
+   /**
+* Sets the maximum allowed number of concurrent requests for 
this sink.
+*
+* This call has no effect if {@link 
CassandraSinkBuilder#enableWriteAheadLog()} is called.
+*
+* @param maxConcurrentRequests maximum number of concurrent 
requests allowed
+* @param timeout timeout duration when acquiring a permit to 
execute
+* @param unit timeout unit when acquiring a permit to execute
+* @return this builder
+*/
+   public CassandraSinkBuilder setMaxConcurrentRequests(int 
maxConcurrentRequests, long timeout, TimeUnit unit) {
 
 Review comment:
   I would also add a short hand with default max timeout basically blocking 
the task thread:
   `setMaxConcurrentRequests(int maxConcurrentRequests);`
   In general, I think users would be more interested in blocking indefinitely 
case which just propagates the back pressure upstream and it can be also seen 
in UI. As I understand, now throwing TimeoutException will fail and job, it 
will be restarted, probably fail with timeout again and so on. Then it is 
better just to block and avoid restarts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646578#comment-16646578
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224487663
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
 
 Review comment:
   The `CompletableFuture`'s could be added to the list inside `invoke`, 
queried from `TestCassandraSink` and completed later in any order. This way we 
could check multiple pending invokes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646581#comment-16646581
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224475996
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
 
 Review comment:
   maybe minor thing, I would rather say that these default constants belong to 
the builder where they are actually used and can be documentation for 
themselves.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646576#comment-16646576
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224477067
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
 
 Review comment:
   These parameters can be final actually because they are set only during 
construction. If they would explode constructor too much, I would rather create 
a separate immutable config class for them and all other possible sink base 
options internally.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646583#comment-16646583
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224481690
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+   // --- Cassandra Fields 
---
 
private final ClusterBuilder builder;
 
-   private final AtomicInteger updatesPending = new AtomicInteger();
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   //  Synchronization Fields 

+
+   private AtomicReference throwable;
+   private Semaphore semaphore;
+   private Phaser phaser;
 
CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
 
-   @Override
-   public void open(Configuration configuration) {
-   this.callback = new FutureCallback() {
-   @Override
-   public void onSuccess(V ignored) {
-   int pending = updatesPending.decrementAndGet();
-   if (pending == 0) {
-   synchronized (updatesPending) {
-   updatesPending.notifyAll();
-   }
-   }
-   }
+   // - Sink Methods 
-
 
+   @Override
+   public void open(Configuration parameters) {
+   cluster = createCluster();
+   session = createSession();
+
+   throwable = new AtomicReference<>();
+   semaphore = new Semaphore(maxConcurrentRequests);
+   /*
+* A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+*
+* This Phaser is configured to support "1 + N" parties.
+*   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+*   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+*
+* The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+* This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+* are being released during a flush() call.
+*/
+   phaser = new Phaser(1) {
 
 Review comment:
   Phaser solution looks good. I am wondering whether we need such low level 
approach. `send` and `snapshotState` are called synchronously. We could have a 
concurrent set of futures and use `FutureUtil.waitForAll` to sync in flush. 
This looks simpler to me. Is there a particular performance reason to use the 
`Pha

[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646582#comment-16646582
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224486780
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
 
 Review comment:
   Is not this case already covered with the following 2 tests in the beginning?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646584#comment-16646584
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224484698
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -127,35 +144,112 @@ public void close() throws Exception {
}
 
@Override
-   public void initializeState(FunctionInitializationContext context) 
throws Exception {
-   }
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {}
 
@Override
-   public void snapshotState(FunctionSnapshotContext ctx) throws Exception 
{
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
checkAsyncErrors();
-   waitForPendingUpdates();
+   flush();
checkAsyncErrors();
}
 
-   private void waitForPendingUpdates() throws InterruptedException {
-   synchronized (updatesPending) {
-   while (updatesPending.get() > 0) {
-   updatesPending.wait();
+   @Override
+   public void invoke(IN value) throws Exception {
+   checkAsyncErrors();
+   tryAcquire();
+   final ListenableFuture result = send(value);
+   Futures.addCallback(result, new FutureCallback() {
+   @Override
+   public void onSuccess(V ignored) {
+   release();
}
+
+   @Override
+   public void onFailure(Throwable currentError) {
+   setAsyncErrors(currentError);
+   release();
+   }
+   });
+   }
+
+   // --- User-Defined Sink Methods 
--
+
+   public abstract ListenableFuture send(IN value);
+
+   // - Configuration Methods 

+
+   /**
+* Sets the maximum allowed number of concurrent requests for this sink.
+*
+* @param maxConcurrentRequests maximum number of concurrent requests 
allowed
+* @param timeout timeout duration when acquiring a permit to execute
+* @param unit timeout unit when acquiring a permit to execute
+*/
+   public void setMaxConcurrentRequests(int maxConcurrentRequests, long 
timeout, TimeUnit unit) {
+   Preconditions.checkArgument(maxConcurrentRequests >= 0, 
"maxConcurrentRequests cannot be negative.");
+   Preconditions.checkArgument(timeout >= 0, "timeout cannot be 
negative.");
+   this.maxConcurrentRequests = maxConcurrentRequests;
+   this.maxConcurrentRequestsTimeout = timeout;
+   this.maxConcurrentRequestsTimeoutUnit = unit;
+   }
+
+   // --- Cassandra Methods 
--
+
+   protected Cluster createCluster() {
 
 Review comment:
   can be private atm


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7

[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646577#comment-16646577
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224484466
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -127,35 +144,112 @@ public void close() throws Exception {
}
 
@Override
-   public void initializeState(FunctionInitializationContext context) 
throws Exception {
-   }
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {}
 
 Review comment:
   `throws Exception` can be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646573#comment-16646573
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224473345
 
 

 ##
 File path: docs/dev/connectors/cassandra.md
 ##
 @@ -72,10 +72,13 @@ The following configuration methods can be used:
 4. _setMapperOptions(MapperOptions options)_
 * Sets the mapper options that are used to configure the DataStax 
ObjectMapper.
 * Only applies when processing __POJO__ data types.
-5. _enableWriteAheadLog([CheckpointCommitter committer])_
+5. _setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit 
unit)_
+* Sets the maximum allowed number of concurrent requests with a timeout 
for acquiring permits to execute.
+* Only applies when __enableWriteAheadLog()__ is not configured.
 
 Review comment:
   Potentially `CassandraTupleWriteAheadSink.sendValues` could be also 
throttled and send values in batches of concurrent requests instead of trying 
to flush at once all values accumulated between checkpoints. It could be useful 
the same way. Is it just matter of implementing effort or there are more 
concerns about this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646579#comment-16646579
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224485975
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnFailure() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.completeExceptionally(new RuntimeException());
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+   }
 
 Review comment:
   also `casSinkFunc.close();`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Impr

[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646575#comment-16646575
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224486540
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnFailure() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.completeExceptionally(new RuntimeException());
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+   }
+
+   @Test(timeout = 5000)
+   public void testTimeoutExceptionOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(0, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null)));
 
+   try {
+   casSinkFunc.invoke("hello");
+   Assert.fail("Sending value should have experienced a 
TimeoutException");
+   } catc

[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224486780
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
 
 Review comment:
   Is not this case already covered with the following 2 tests in the beginning?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224481690
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+   // --- Cassandra Fields 
---
 
private final ClusterBuilder builder;
 
-   private final AtomicInteger updatesPending = new AtomicInteger();
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   //  Synchronization Fields 

+
+   private AtomicReference throwable;
+   private Semaphore semaphore;
+   private Phaser phaser;
 
CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
 
-   @Override
-   public void open(Configuration configuration) {
-   this.callback = new FutureCallback() {
-   @Override
-   public void onSuccess(V ignored) {
-   int pending = updatesPending.decrementAndGet();
-   if (pending == 0) {
-   synchronized (updatesPending) {
-   updatesPending.notifyAll();
-   }
-   }
-   }
+   // - Sink Methods 
-
 
+   @Override
+   public void open(Configuration parameters) {
+   cluster = createCluster();
+   session = createSession();
+
+   throwable = new AtomicReference<>();
+   semaphore = new Semaphore(maxConcurrentRequests);
+   /*
+* A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+*
+* This Phaser is configured to support "1 + N" parties.
+*   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+*   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+*
+* The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+* This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+* are being released during a flush() call.
+*/
+   phaser = new Phaser(1) {
 
 Review comment:
   Phaser solution looks good. I am wondering whether we need such low level 
approach. `send` and `snapshotState` are called synchronously. We could have a 
concurrent set of futures and use `FutureUtil.waitForAll` to sync in flush. 
This looks simpler to me. Is there a particular performance reason to use the 
`Phaser`?
   
   I would also abstract semaphore/Phaser/futures/addFuture/waitForAll/error 
away into another reusable and pluggable component. It can be also tested 
separately.


This is an autom

[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224477067
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
 
 Review comment:
   These parameters can be final actually because they are set only during 
construction. If they would explode constructor too much, I would rather create 
a separate immutable config class for them and all other possible sink base 
options internally.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224484698
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -127,35 +144,112 @@ public void close() throws Exception {
}
 
@Override
-   public void initializeState(FunctionInitializationContext context) 
throws Exception {
-   }
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {}
 
@Override
-   public void snapshotState(FunctionSnapshotContext ctx) throws Exception 
{
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
checkAsyncErrors();
-   waitForPendingUpdates();
+   flush();
checkAsyncErrors();
}
 
-   private void waitForPendingUpdates() throws InterruptedException {
-   synchronized (updatesPending) {
-   while (updatesPending.get() > 0) {
-   updatesPending.wait();
+   @Override
+   public void invoke(IN value) throws Exception {
+   checkAsyncErrors();
+   tryAcquire();
+   final ListenableFuture result = send(value);
+   Futures.addCallback(result, new FutureCallback() {
+   @Override
+   public void onSuccess(V ignored) {
+   release();
}
+
+   @Override
+   public void onFailure(Throwable currentError) {
+   setAsyncErrors(currentError);
+   release();
+   }
+   });
+   }
+
+   // --- User-Defined Sink Methods 
--
+
+   public abstract ListenableFuture send(IN value);
+
+   // - Configuration Methods 

+
+   /**
+* Sets the maximum allowed number of concurrent requests for this sink.
+*
+* @param maxConcurrentRequests maximum number of concurrent requests 
allowed
+* @param timeout timeout duration when acquiring a permit to execute
+* @param unit timeout unit when acquiring a permit to execute
+*/
+   public void setMaxConcurrentRequests(int maxConcurrentRequests, long 
timeout, TimeUnit unit) {
+   Preconditions.checkArgument(maxConcurrentRequests >= 0, 
"maxConcurrentRequests cannot be negative.");
+   Preconditions.checkArgument(timeout >= 0, "timeout cannot be 
negative.");
+   this.maxConcurrentRequests = maxConcurrentRequests;
+   this.maxConcurrentRequestsTimeout = timeout;
+   this.maxConcurrentRequestsTimeoutUnit = unit;
+   }
+
+   // --- Cassandra Methods 
--
+
+   protected Cluster createCluster() {
 
 Review comment:
   can be private atm


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224485975
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnFailure() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.completeExceptionally(new RuntimeException());
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+   }
 
 Review comment:
   also `casSinkFunc.close();`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224475206
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ##
 @@ -354,6 +359,23 @@ protected Cluster buildCluster(Cluster.Builder builder) {
return this;
}
 
+   /**
+* Sets the maximum allowed number of concurrent requests for 
this sink.
+*
+* This call has no effect if {@link 
CassandraSinkBuilder#enableWriteAheadLog()} is called.
+*
+* @param maxConcurrentRequests maximum number of concurrent 
requests allowed
+* @param timeout timeout duration when acquiring a permit to 
execute
+* @param unit timeout unit when acquiring a permit to execute
+* @return this builder
+*/
+   public CassandraSinkBuilder setMaxConcurrentRequests(int 
maxConcurrentRequests, long timeout, TimeUnit unit) {
 
 Review comment:
   I would also add a short hand with default max timeout basically blocking 
the task thread:
   `setMaxConcurrentRequests(int maxConcurrentRequests);`
   In general, I think users would be more interested in blocking indefinitely 
case which just propagates the back pressure upstream and it can be also seen 
in UI. As I understand, now throwing TimeoutException will fail and job, it 
will be restarted, probably fail with timeout again and so on. Then it is 
better just to block and avoid restarts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224473345
 
 

 ##
 File path: docs/dev/connectors/cassandra.md
 ##
 @@ -72,10 +72,13 @@ The following configuration methods can be used:
 4. _setMapperOptions(MapperOptions options)_
 * Sets the mapper options that are used to configure the DataStax 
ObjectMapper.
 * Only applies when processing __POJO__ data types.
-5. _enableWriteAheadLog([CheckpointCommitter committer])_
+5. _setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit 
unit)_
+* Sets the maximum allowed number of concurrent requests with a timeout 
for acquiring permits to execute.
+* Only applies when __enableWriteAheadLog()__ is not configured.
 
 Review comment:
   Potentially `CassandraTupleWriteAheadSink.sendValues` could be also 
throttled and send values in batches of concurrent requests instead of trying 
to flush at once all values accumulated between checkpoints. It could be useful 
the same way. Is it just matter of implementing effort or there are more 
concerns about this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224484466
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -127,35 +144,112 @@ public void close() throws Exception {
}
 
@Override
-   public void initializeState(FunctionInitializationContext context) 
throws Exception {
-   }
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {}
 
 Review comment:
   `throws Exception` can be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224486164
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnFailure() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.completeExceptionally(new RuntimeException());
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+   }
+
+   @Test(timeout = 5000)
+   public void testTimeoutExceptionOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(0, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null)));
 
+   try {
+   casSinkFunc.invoke("hello");
+   Assert.fail("Sending value should have experienced a 
TimeoutException");
+   } catch (Exception e) {
+   Assert.assertTrue(e instanceof TimeoutException);
+   }
+   }
 
 Review comment:
   also casSinkFunc.close();?


This is an automated m

[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224475996
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
 
 Review comment:
   maybe minor thing, I would rather say that these default constants belong to 
the builder where they are actually used and can be documentation for 
themselves.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224486540
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnFailure() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.completeExceptionally(new RuntimeException());
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+   }
+
+   @Test(timeout = 5000)
+   public void testTimeoutExceptionOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(0, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null)));
 
+   try {
+   casSinkFunc.invoke("hello");
+   Assert.fail("Sending value should have experienced a 
TimeoutException");
+   } catch (Exception e) {
+   Assert.assertTrue(e instanceof TimeoutException);
+   }
+   }
+
+   private static class TestCassandraSink extends 
CassandraSinkBase {
 
 Review comment:
   can be `AutoClosable` to use with try

[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-11 Thread GitBox
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r224487663
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ##
 @@ -204,23 +206,113 @@ public void go() throws Exception {
Thread.sleep(5);
}
 
-   Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
completableFuture.complete(null);
-   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
}
 
-   private static class TestCassandraSink extends 
CassandraSinkBase {
+   @Test(timeout = 5000)
+   public void testAcquireOnInvoke() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
+
+   Assert.assertEquals(0, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+   completableFuture.complete(null);
+
+   casSinkFunc.close();
+   }
+
+   @Test(timeout = 5000)
+   public void testReleaseOnSuccess() throws Exception {
+   TestCassandraSink casSinkFunc = new TestCassandraSink();
+   casSinkFunc.setMaxConcurrentRequests(1, 5, 
TimeUnit.MILLISECONDS);
+
+   casSinkFunc.open(new Configuration());
+
+   CompletableFuture completableFuture = new 
CompletableFuture<>();
+
+   
casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture));
+
+   Assert.assertEquals(1, casSinkFunc.getAvailablePermits());
+   Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+
+   casSinkFunc.invoke("hello");
 
 Review comment:
   The `CompletableFuture`'s could be added to the list inside `invoke`, 
queried from `TestCassandraSink` and completed later in any order. This way we 
could check multiple pending invokes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-9788) ExecutionGraph Inconsistency prevents Job from recovering

2018-10-11 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9788.
--
Resolution: Fixed

Fixed via
1.7.0: 
https://github.com/apache/flink/commit/5ae68f85fa4182d94608583b133e8162c4f8f07d
1.6.2: 
https://github.com/apache/flink/commit/15a90891a840aff53260a8819e1120cd310f7336
1.5.5: 
https://github.com/apache/flink/commit/d13015e23c805e1aaefdc7d8037f2fa87ea74830

> ExecutionGraph Inconsistency prevents Job from recovering
> -
>
> Key: FLINK-9788
> URL: https://issues.apache.org/jira/browse/FLINK-9788
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.0
> Environment: Rev: 4a06160
> Hadoop 2.8.3
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
> Attachments: jobmanager_5000.log
>
>
> Deployment mode: YARN job mode with HA
> After killing many TaskManagers in succession, the state of the 
> ExecutionGraph ran into an inconsistent state, which prevented job recovery. 
> The following stacktrace was logged in the JobManager log several hundred 
> times per second:
> {noformat}
> -08 16:47:18,855 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph 
>- Job General purpose test job (37a794195840700b98feb23e99f7ea24) 
> switched from state RESTARTING to RESTARTING.
> 2018-07-08 16:47:18,856 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting 
> the job General purpose test job (37a794195840700b98feb23e99f7ea24).
> 2018-07-08 16:47:18,857 DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Resetting 
> execution vertex Source: Custom Source -> Timestamps/Watermarks (1/10) for 
> new execution.
> 2018-07-08 16:47:18,857 WARN  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Failed to 
> restart the job.
> java.lang.IllegalStateException: Cannot reset a vertex that is in 
> non-terminal state CREATED
> at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.resetForNewExecution(ExecutionVertex.java:610)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:573)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1251)
> at 
> org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
> at 
> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The resulting jobmanager log file was 4.7 GB in size. Find attached the first 
> 5000 lines of the log file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #6812: [BP-1.5][FLINK-9788] Fix ExecutionGraph inconsistency for global failures when restarting

2018-10-11 Thread GitBox
tillrohrmann closed pull request #6812: [BP-1.5][FLINK-9788] Fix ExecutionGraph 
inconsistency for global failures when restarting
URL: https://github.com/apache/flink/pull/6812
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..0be1ff27420 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) {
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
-   }
-   else if (current == JobStatus.RESTARTING) {
-   // we handle 'failGlobal()' while in 
'RESTARTING' as a safety net in case something
-   // has gone wrong in 'RESTARTING' and we need 
to re-attempt the restarts
-   initFailureCause(t);
-
-   final long globalVersionForRestart = 
incrementGlobalModVersion();
-   if (tryRestartOrFail(globalVersionForRestart)) {
-   return;
-   }
-   }
-   else if (transitionState(current, JobStatus.FAILING, 
t)) {
+   } else if (transitionState(current, JobStatus.FAILING, 
t)) {
initFailureCause(t);
 
// make sure no concurrent local or global 
actions interfere with the failover
@@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) {
colGroups.add(cgroup);
}
 
-   jv.resetForNewExecution(resetTimestamp, 
globalModVersion);
+   jv.resetForNewExecution(resetTimestamp, 
expectedGlobalVersion);
}
 
for (int i = 0; i < stateTimestamps.length; 
i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 9b98de78143..91510d1af54 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -57,12 +58,15 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
@@ -86,15 +90,20 @@
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.spy;
 
+/**
+ * Tests the restart behaviour of the {@link ExecutionGraph}.
+ */
 public class ExecutionGraphRestartTest extends TestLogger {
 
-   private final static int NUM_TASKS = 31;
+   private static final int NUM_TASKS = 31;
 
private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(4);
 
@@ -113,9 +122

[GitHub] tillrohrmann closed pull request #6811: [BP-1.6][FLINK-9788] Fix ExecutionGraph inconsistency for global failures when restarting

2018-10-11 Thread GitBox
tillrohrmann closed pull request #6811: [BP-1.6][FLINK-9788] Fix ExecutionGraph 
inconsistency for global failures when restarting
URL: https://github.com/apache/flink/pull/6811
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..0be1ff27420 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) {
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
-   }
-   else if (current == JobStatus.RESTARTING) {
-   // we handle 'failGlobal()' while in 
'RESTARTING' as a safety net in case something
-   // has gone wrong in 'RESTARTING' and we need 
to re-attempt the restarts
-   initFailureCause(t);
-
-   final long globalVersionForRestart = 
incrementGlobalModVersion();
-   if (tryRestartOrFail(globalVersionForRestart)) {
-   return;
-   }
-   }
-   else if (transitionState(current, JobStatus.FAILING, 
t)) {
+   } else if (transitionState(current, JobStatus.FAILING, 
t)) {
initFailureCause(t);
 
// make sure no concurrent local or global 
actions interfere with the failover
@@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) {
colGroups.add(cgroup);
}
 
-   jv.resetForNewExecution(resetTimestamp, 
globalModVersion);
+   jv.resetForNewExecution(resetTimestamp, 
expectedGlobalVersion);
}
 
for (int i = 0; i < stateTimestamps.length; 
i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 9b98de78143..91510d1af54 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -57,12 +58,15 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
@@ -86,15 +90,20 @@
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.spy;
 
+/**
+ * Tests the restart behaviour of the {@link ExecutionGraph}.
+ */
 public class ExecutionGraphRestartTest extends TestLogger {
 
-   private final static int NUM_TASKS = 31;
+   private static final int NUM_TASKS = 31;
 
private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(4);
 
@@ -113,9 +122

[jira] [Commented] (FLINK-9788) ExecutionGraph Inconsistency prevents Job from recovering

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646551#comment-16646551
 ] 

ASF GitHub Bot commented on FLINK-9788:
---

tillrohrmann closed pull request #6810: [FLINK-9788] Fix ExecutionGraph 
inconsistency for global failures when restarting
URL: https://github.com/apache/flink/pull/6810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..0be1ff27420 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) {
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
-   }
-   else if (current == JobStatus.RESTARTING) {
-   // we handle 'failGlobal()' while in 
'RESTARTING' as a safety net in case something
-   // has gone wrong in 'RESTARTING' and we need 
to re-attempt the restarts
-   initFailureCause(t);
-
-   final long globalVersionForRestart = 
incrementGlobalModVersion();
-   if (tryRestartOrFail(globalVersionForRestart)) {
-   return;
-   }
-   }
-   else if (transitionState(current, JobStatus.FAILING, 
t)) {
+   } else if (transitionState(current, JobStatus.FAILING, 
t)) {
initFailureCause(t);
 
// make sure no concurrent local or global 
actions interfere with the failover
@@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) {
colGroups.add(cgroup);
}
 
-   jv.resetForNewExecution(resetTimestamp, 
globalModVersion);
+   jv.resetForNewExecution(resetTimestamp, 
expectedGlobalVersion);
}
 
for (int i = 0; i < stateTimestamps.length; 
i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 9b98de78143..91510d1af54 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -57,12 +58,15 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
@@ -86,15 +90,20 @@
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.spy;
 
+/**
+ * Tests the restart behaviour of the {@link ExecutionGraph}.
+ */
 public class ExecutionGra

  1   2   3   >