[jira] [Assigned] (FLINK-10506) Introduce minimum, target and maximum parallelism to JobGraph
[ https://issues.apache.org/jira/browse/FLINK-10506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-10506: Assignee: Gary Yao > Introduce minimum, target and maximum parallelism to JobGraph > - > > Key: FLINK-10506 > URL: https://issues.apache.org/jira/browse/FLINK-10506 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Fix For: 1.7.0 > > > In order to run a job with a variable parallelism, one needs to be able to > define the minimum and maximum parallelism for an operator as well as the > current target value. In the first implementation, minimum could be 1 and > maximum the max parallelism of the operator if no explicit parallelism has > been specified for an operator. If a parallelism p has been specified (via > setParallelism(p)), then minimum = maximum = p. The target value could be the > command line parameter -p or the default parallelism. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647542#comment-16647542 ] Bowen Li commented on FLINK-10168: -- Well, the params of {{readFile()}} do feel a bit crowded though. Hopefully at some point, we can abstract all params into a configuration, the same style as other config-based sources > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. What > this means is that, currently the API is > {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path > filter. A more generic API that can take more filters can look like this 1) > {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... > ))}} > 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} > exposes all file attributes that Flink's file system can provide, like path > and modified time > I lean towards the 2nd option, because it gives users more flexibility to > define complex filtering rules based on combinations of file attributes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10533) job parallelism equals task slots number but not use the same number of the task slots as the parallelism
[ https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sean.miao updated FLINK-10533: -- Description: i use the table api and do not use the datastream api。 my job has two graph and every parallelism is two.so the total parallelism is four; but if give my job four slots but it just use two slots. !image-2018-10-12-10-36-13-503.png! !image-2018-10-12-10-35-57-443.png! thanks. was: i use the table api and do not use the datastream api。 my job has two graph and every parallelism is two. if give my job four slots but it just use two slots. !image-2018-10-12-10-36-13-503.png! !image-2018-10-12-10-35-57-443.png! thanks. > job parallelism equals task slots number but not use the same number of the > task slots as the parallelism > - > > Key: FLINK-10533 > URL: https://issues.apache.org/jira/browse/FLINK-10533 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.3, 1.6.0, 1.6.1 >Reporter: sean.miao >Priority: Major > Attachments: image-2018-10-12-10-35-57-443.png, > image-2018-10-12-10-36-13-503.png > > > i use the table api and do not use the datastream api。 > > my job has two graph and every parallelism is two.so the total parallelism is > four; > but if give my job four slots but it just use two slots. > > !image-2018-10-12-10-36-13-503.png! > !image-2018-10-12-10-35-57-443.png! > > thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10533) job parallelism equals task slots number but not use the same number of the task slots as the parallelism
[ https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sean.miao updated FLINK-10533: -- Summary: job parallelism equals task slots number but not use the same number of the task slots as the parallelism (was: job parallelism equals task slots number but not use all task slot) > job parallelism equals task slots number but not use the same number of the > task slots as the parallelism > - > > Key: FLINK-10533 > URL: https://issues.apache.org/jira/browse/FLINK-10533 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.3, 1.6.0, 1.6.1 >Reporter: sean.miao >Priority: Major > Attachments: image-2018-10-12-10-35-57-443.png, > image-2018-10-12-10-36-13-503.png > > > i use the table api and do not use the datastream api。 > > my job has two graph and every parallelism is two. > if give my job four slots but it just use two slots. > > !image-2018-10-12-10-36-13-503.png! > !image-2018-10-12-10-35-57-443.png! > > thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10533) job parallelism equals task slots number but not use all task slot
[ https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sean.miao updated FLINK-10533: -- Summary: job parallelism equals task slots number but not use all task slot (was: job parallelism equals task slots number but not use all tasl slot) > job parallelism equals task slots number but not use all task slot > -- > > Key: FLINK-10533 > URL: https://issues.apache.org/jira/browse/FLINK-10533 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.3, 1.6.0, 1.6.1 >Reporter: sean.miao >Priority: Major > Attachments: image-2018-10-12-10-35-57-443.png, > image-2018-10-12-10-36-13-503.png > > > i use the table api and do not use the datastream api。 > > my job has two graph and every parallelism is two. > if give my job four slots but it just use two slots. > > !image-2018-10-12-10-36-13-503.png! > !image-2018-10-12-10-35-57-443.png! > > thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10533) job parallelism equals task slots number but not use all tasl slot
[ https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sean.miao updated FLINK-10533: -- Summary: job parallelism equals task slots number but not use all tasl slot (was: job parallelism equals task slot number but not use all tasl slot) > job parallelism equals task slots number but not use all tasl slot > -- > > Key: FLINK-10533 > URL: https://issues.apache.org/jira/browse/FLINK-10533 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.3, 1.6.0, 1.6.1 >Reporter: sean.miao >Priority: Major > Attachments: image-2018-10-12-10-35-57-443.png, > image-2018-10-12-10-36-13-503.png > > > i use the table api and do not use the datastream api。 > > my job has two graph and every parallelism is two. > if give my job four slots but it just use two slots. > > !image-2018-10-12-10-36-13-503.png! > !image-2018-10-12-10-35-57-443.png! > > thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10533) job parallelism equals task slot number but not use all tasl slot
[ https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sean.miao updated FLINK-10533: -- Affects Version/s: 1.5.3 1.6.0 > job parallelism equals task slot number but not use all tasl slot > - > > Key: FLINK-10533 > URL: https://issues.apache.org/jira/browse/FLINK-10533 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.3, 1.6.0, 1.6.1 >Reporter: sean.miao >Priority: Major > Attachments: image-2018-10-12-10-35-57-443.png, > image-2018-10-12-10-36-13-503.png > > > i use the table api and do not use the datastream api。 > > my job has two graph and every parallelism is two. > if give my job four slots but it just use two slots. > > !image-2018-10-12-10-36-13-503.png! > !image-2018-10-12-10-35-57-443.png! > > thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647534#comment-16647534 ] Bowen Li commented on FLINK-10168: -- On a second thought to extend my comment on making this function similar to message queues' start position function, I feel defining it as a filter API would make it actually more complex than user anticipation. How about just providing a {{startPosition}} param to {{readFile() API}} to make it as simple as those in Flink's Kinesis/Kafka consumers? [~kkl0u] [~fhueske] This way, it also might be even easier for Flink's SQL to parse when we need to add SQL file sources in the future. > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. What > this means is that, currently the API is > {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path > filter. A more generic API that can take more filters can look like this 1) > {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... > ))}} > 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} > exposes all file attributes that Flink's file system can provide, like path > and modified time > I lean towards the 2nd option, because it gives users more flexibility to > define complex filtering rules based on combinations of file attributes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635923#comment-16635923 ] Bowen Li edited comment on FLINK-10168 at 10/12/18 6:05 AM: [~kkl0u] what do you think about this task? I believe this is very important, this provides users with similar functions in streaming (kinesis, kafka) where users specify a start position in streams to read data from a certain point of time. was (Author: phoenixjiangnan): [~kkl0u] what do you think about this task? I believe this is very important, this provides users with similar functions in streaming (kinesis, kafka) where users can read data from a certain point of time. > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. What > this means is that, currently the API is > {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path > filter. A more generic API that can take more filters can look like this 1) > {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... > ))}} > 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} > exposes all file attributes that Flink's file system can provide, like path > and modified time > I lean towards the 2nd option, because it gives users more flexibility to > define complex filtering rules based on combinations of file attributes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-10168: Assignee: Bowen Li (was: Jiayi Liao) > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. What > this means is that, currently the API is > {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path > filter. A more generic API that can take more filters can look like this 1) > {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... > ))}} > 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} > exposes all file attributes that Flink's file system can provide, like path > and modified time > I lean towards the 2nd option, because it gives users more flexibility to > define complex filtering rules based on combinations of file attributes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647499#comment-16647499 ] ouyangzhe commented on FLINK-10534: --- [~yanghua] thanks :) . > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647462#comment-16647462 ] vinoyang commented on FLINK-10534: -- [~yeshan] I believe he will see and give you permission, don't worry too much, Germany and China have jet lag. They have not yet started working. > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647381#comment-16647381 ] ouyangzhe commented on FLINK-10534: --- Hi~ [~till.rohrmann] , i'd like to contribute, can you give me the authority to assign Jira to myself? > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647379#comment-16647379 ] vinoyang commented on FLINK-10534: -- [~yeshan] It seems that you currently have no JIRA contribute permission? I am happy that Ping [~till.rohrmann] or [~Zentol] assign you permissions. > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10534: Assignee: (was: vinoyang) > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647376#comment-16647376 ] ouyangzhe commented on FLINK-10534: --- hi~ [~yanghua] , for this issue, I have a prototype already, can you help to assign it to me to work on the feature? ( I have no authority to assign to myself) > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Assignee: vinoyang >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10508) Port JobManagerITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647375#comment-16647375 ] TisonKun commented on FLINK-10508: -- checklist: JobManagerITCase.The JobManager actor must handle jobs when not enough slots JobManagerITCase.The JobManager actor must support immediate scheduling of a single vertex JobManagerITCase.The JobManager actor must support queued scheduling of a single vertex JobManagerITCase.The JobManager actor must support forward jobs JobManagerITCase.The JobManager actor must support bipartite job JobManagerITCase.The JobManager actor must support two input job failing edge mismatch JobManagerITCase.The JobManager actor must support two input job JobManagerITCase.The JobManager actor must support scheduling all at once JobManagerITCase.The JobManager actor must handle job with a failing sender vertex JobManagerITCase.The JobManager actor must handle job with an occasionally failing sender vertex JobManagerITCase.The JobManager actor must handle job with a failing receiver vertex JobManagerITCase.The JobManager actor must handle job with all vertices failing during instantiation JobManagerITCase.The JobManager actor must handle job with some vertices failing during instantiation JobManagerITCase.The JobManager actor must check that all job vertices have completed the call to finalizeOnMaster before the job completes JobManagerITCase.The JobManager actor must remove execution graphs when the client ends the session explicitly JobManagerITCase.The JobManager actor must remove execution graphs when when the client's session times out JobManagerITCase.The JobManager actor must handle trigger savepoint response for non-existing job JobManagerITCase.The JobManager actor must handle trigger savepoint response for job with disabled checkpointing JobManagerITCase.The JobManager actor must handle trigger savepoint response after trigger savepoint failure JobManagerITCase.The JobManager actor must handle failed savepoint triggering JobManagerITCase.The JobManager actor must handle trigger savepoint response after succeeded savepoint future > Port JobManagerITCase to new code base > -- > > Key: FLINK-10508 > URL: https://issues.apache.org/jira/browse/FLINK-10508 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647373#comment-16647373 ] vinoyang commented on FLINK-10534: -- [~isunjin] agree. > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Assignee: vinoyang >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647372#comment-16647372 ] JIN SUN commented on FLINK-10534: - session can have the semantic of expire time. we probably can add a parameter to to specify the expiry time while create the session (maybe in command line), the default value can be infinite to keep current behavior. > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Assignee: vinoyang >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647370#comment-16647370 ] ASF GitHub Bot commented on FLINK-10135: yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-429191782 display here then I will try to trigger Travis rebuild : ``` 17:18:24.891 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-s3-fs-base: Compilation failure: Compilation failure: 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[38,1] cannot find symbol 17:18:24.891 [ERROR] symbol: static S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java:[38,1] cannot find symbol 17:18:24.891 [ERROR] symbol: static S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java:[137,57] cannot find symbol 17:18:24.891 [ERROR] symbol: variable S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class org.apache.flink.fs.s3.common.writer.S3RecoverableWriter 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java:[137,72] toHadoopPath(org.apache.flink.core.fs.Path) is not public in org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; cannot be accessed from outside package 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[234,57] cannot find symbol 17:18:24.891 [ERROR] symbol: variable S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[252,57] cannot find symbol 17:18:24.891 [ERROR] symbol: variable S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-429191782 display here then I will try to trigger Travis rebuild : ``` 17:18:24.891 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-s3-fs-base: Compilation failure: Compilation failure: 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[38,1] cannot find symbol 17:18:24.891 [ERROR] symbol: static S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java:[38,1] cannot find symbol 17:18:24.891 [ERROR] symbol: static S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java:[137,57] cannot find symbol 17:18:24.891 [ERROR] symbol: variable S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class org.apache.flink.fs.s3.common.writer.S3RecoverableWriter 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java:[137,72] toHadoopPath(org.apache.flink.core.fs.Path) is not public in org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; cannot be accessed from outside package 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[234,57] cannot find symbol 17:18:24.891 [ERROR] symbol: variable S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream 17:18:24.891 [ERROR] /home/travis/build/apache/flink/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:[252,57] cannot find symbol 17:18:24.891 [ERROR] symbol: variable S3_MULTIPART_MIN_PART_SIZE 17:18:24.891 [ERROR] location: class org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647366#comment-16647366 ] ASF GitHub Bot commented on FLINK-10135: yanghua edited a comment on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-429191342 I saw some compile errors not related to this PR change which caused Travis failed from `flink-s3-fs-hadoop`. cc @tillrohrmann and @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647365#comment-16647365 ] ASF GitHub Bot commented on FLINK-10135: yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-429191342 I saw some errors not related to this PR change, from `flink-s3-fs-hadoop`. cc @tillrohrmann and @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua edited a comment on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
yanghua edited a comment on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-429191342 I saw some compile errors not related to this PR change which caused Travis failed from `flink-s3-fs-hadoop`. cc @tillrohrmann and @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-429191342 I saw some errors not related to this PR change, from `flink-s3-fs-hadoop`. cc @tillrohrmann and @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10534: Assignee: vinoyang > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Assignee: vinoyang >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647352#comment-16647352 ] vinoyang commented on FLINK-10534: -- Hi [~yeshan] Thank you for making this suggestion, it seems that it is reasonable. [~till.rohrmann], what do you think? > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Assignee: vinoyang >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10535) User jar is present in the flink job manager's class path
yinhua.dai created FLINK-10535: -- Summary: User jar is present in the flink job manager's class path Key: FLINK-10535 URL: https://issues.apache.org/jira/browse/FLINK-10535 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.5.0 Reporter: yinhua.dai Run flink on yarn with attach and per-job mode, i.e. {code:java} flink run -m yarn-cluster -yn 1 *** user.jar {code} The user.jar will not be present in flink job manager's class path, but as we observed that in flink 1.3 it will by default put to the beginning of flink job manager's class path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10533) job parallelism equals task slot number but not use all tasl slot
[ https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sean.miao updated FLINK-10533: -- Description: i use the table api and do not use the datastream api。 my job has two graph and every parallelism is two. if give my job four slots but it just use two slots. !image-2018-10-12-10-36-13-503.png! !image-2018-10-12-10-35-57-443.png! thanks. was: my job has two graph and every parallelism is two. if give my job four slots but it just use two slots. !image-2018-10-12-10-36-13-503.png! !image-2018-10-12-10-35-57-443.png! thanks. > job parallelism equals task slot number but not use all tasl slot > - > > Key: FLINK-10533 > URL: https://issues.apache.org/jira/browse/FLINK-10533 > Project: Flink > Issue Type: Bug >Affects Versions: 1.6.1 >Reporter: sean.miao >Priority: Major > Attachments: image-2018-10-12-10-35-57-443.png, > image-2018-10-12-10-36-13-503.png > > > i use the table api and do not use the datastream api。 > > my job has two graph and every parallelism is two. > if give my job four slots but it just use two slots. > > !image-2018-10-12-10-36-13-503.png! > !image-2018-10-12-10-35-57-443.png! > > thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10534) Add idle timeout for a flink session cluster
ouyangzhe created FLINK-10534: - Summary: Add idle timeout for a flink session cluster Key: FLINK-10534 URL: https://issues.apache.org/jira/browse/FLINK-10534 Project: Flink Issue Type: New Feature Components: Cluster Management Affects Versions: 1.7.0 Reporter: ouyangzhe Attachments: 屏幕快照 2018-10-12 上午10.24.08.png The flink session cluster on yarn will aways be running while has no jobs running at all, it will occupy the yarn resources for no use. Taskmanagers will be released after an idle timeout, but jobmanager will keep running. I propose to add a configuration to limit the idle timeout for jobmanager too, if no job running after a specified timeout, the flink cluster auto finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10533) job parallelism equals task slot number but not use all tasl slot
sean.miao created FLINK-10533: - Summary: job parallelism equals task slot number but not use all tasl slot Key: FLINK-10533 URL: https://issues.apache.org/jira/browse/FLINK-10533 Project: Flink Issue Type: Bug Affects Versions: 1.6.1 Reporter: sean.miao Attachments: image-2018-10-12-10-35-57-443.png, image-2018-10-12-10-36-13-503.png my job has two graph and every parallelism is two. if give my job four slots but it just use two slots. !image-2018-10-12-10-36-13-503.png! !image-2018-10-12-10-35-57-443.png! thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate
[ https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647316#comment-16647316 ] ASF GitHub Bot commented on FLINK-10367: zhijiangW opened a new pull request #6829: [FLINK-10367][network] Introduce NotificationResult for BufferListener to solve recursive stack overflow URL: https://github.com/apache/flink/pull/6829 ## What is the purpose of the change *In the process of `LocalBufferPool#recycle`, the recycled buffer would be notified to a `BufferListener`. But this `BufferListener` may not need floating buffers any more currently, so this buffer is recycled again to the `LocalBufferPool`, then another `BufferListener` is selected to be notified of this available buffer. The above process may be repeatedly triggered in recursive way that will cause stack overflow error in extreme case. We ever encountered this error triggered by release all resources during task failover in large scale job, especially it will also result in buffer leak after stack overflow.* ## Brief change log - *Introduce the `NotificationResult` for describing the notification result for `BufferListener`* - *Adjust the process of `LocalBufferPool#recycle` to avoid recursive call* - *Adjust the process of `RemoteInputChannel#notifyBufferAvailable` accordingly* ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid recursion stack overflow during releasing SingleInputGate > --- > > Key: FLINK-10367 > URL: https://issues.apache.org/jira/browse/FLINK-10367 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > For task failure or canceling, the {{SingleInputGate#releaseAllResources}} > will be invoked before task exits. > In the process of {{SingleInputGate#releaseAllResources}}, we first loop to > release all the input channels, then destroy the {{BufferPool}}. For > {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers > to the {{BufferPool}} {{which assigns this recycled buffer to the other > listeners(RemoteInputChannel}}). > It may exist recursive call in this process. If the listener is already > released before, it will directly recycle this buffer to the {{BufferPool}} > which takes another listener to notify available buffer. The above process > may be invoked repeatedly in recursive way. > If there are many input channels as listeners in the {{BufferPool}}, it will > cause {{StackOverflow}} error because of recursion. And in our testing job, > the scale of 10,000 input channels ever caused this error. > I think of two ways for solving this potential problem: > # When the input channel is released, it should notify the {{BufferPool}} of > unregistering this listener, otherwise it is inconsistent between them. > # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to > release all the internal input channels. To do so, all the listeners in > {{BufferPool}} will be removed during destroying, and the input channel will > not have further interactions during > {{RemoteInputChannel#releaseAllResources}}. > I prefer the second way to solve this problem, because we do not want to > expand another interface method for removing buffer listener, further > currently the internal data structure in {{BufferPool}} can not support > remove a listener directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate
[ https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10367: --- Labels: pull-request-available (was: ) > Avoid recursion stack overflow during releasing SingleInputGate > --- > > Key: FLINK-10367 > URL: https://issues.apache.org/jira/browse/FLINK-10367 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > For task failure or canceling, the {{SingleInputGate#releaseAllResources}} > will be invoked before task exits. > In the process of {{SingleInputGate#releaseAllResources}}, we first loop to > release all the input channels, then destroy the {{BufferPool}}. For > {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers > to the {{BufferPool}} {{which assigns this recycled buffer to the other > listeners(RemoteInputChannel}}). > It may exist recursive call in this process. If the listener is already > released before, it will directly recycle this buffer to the {{BufferPool}} > which takes another listener to notify available buffer. The above process > may be invoked repeatedly in recursive way. > If there are many input channels as listeners in the {{BufferPool}}, it will > cause {{StackOverflow}} error because of recursion. And in our testing job, > the scale of 10,000 input channels ever caused this error. > I think of two ways for solving this potential problem: > # When the input channel is released, it should notify the {{BufferPool}} of > unregistering this listener, otherwise it is inconsistent between them. > # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to > release all the internal input channels. To do so, all the listeners in > {{BufferPool}} will be removed during destroying, and the input channel will > not have further interactions during > {{RemoteInputChannel#releaseAllResources}}. > I prefer the second way to solve this problem, because we do not want to > expand another interface method for removing buffer listener, further > currently the internal data structure in {{BufferPool}} can not support > remove a listener directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW opened a new pull request #6829: [FLINK-10367][network] Introduce NotificationResult for BufferListener to solve recursive stack overflow
zhijiangW opened a new pull request #6829: [FLINK-10367][network] Introduce NotificationResult for BufferListener to solve recursive stack overflow URL: https://github.com/apache/flink/pull/6829 ## What is the purpose of the change *In the process of `LocalBufferPool#recycle`, the recycled buffer would be notified to a `BufferListener`. But this `BufferListener` may not need floating buffers any more currently, so this buffer is recycled again to the `LocalBufferPool`, then another `BufferListener` is selected to be notified of this available buffer. The above process may be repeatedly triggered in recursive way that will cause stack overflow error in extreme case. We ever encountered this error triggered by release all resources during task failover in large scale job, especially it will also result in buffer leak after stack overflow.* ## Brief change log - *Introduce the `NotificationResult` for describing the notification result for `BufferListener`* - *Adjust the process of `LocalBufferPool#recycle` to avoid recursive call* - *Adjust the process of `RemoteInputChannel#notifyBufferAvailable` accordingly* ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9544) Downgrade kinesis protocol from CBOR to JSON not possible as required by kinesalite
[ https://issues.apache.org/jira/browse/FLINK-9544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647280#comment-16647280 ] Diego Carvallo commented on FLINK-9544: --- [~eyushin] That is not working either in v 1.6.0. I tried several options and none of them worked in my local: {code:java} env.java.opts: "-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor" env.java.opts: "-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor=true" env.java.opts: "-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor=1" env.java.opts: "-Dcom.amazonaws.sdk.disableCbor" env.java.opts: "-Dcom.amazonaws.sdk.disableCbor=true" env.java.opts: "-Dcom.amazonaws.sdk.disableCbor=1" {code} I think it may be related to what's explained in this thread: [https://stackoverflow.com/questions/42344624/apache-flink-custom-java-options-are-not-recognized-inside-job] > Downgrade kinesis protocol from CBOR to JSON not possible as required by > kinesalite > --- > > Key: FLINK-9544 > URL: https://issues.apache.org/jira/browse/FLINK-9544 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2 >Reporter: Ph.Duveau >Priority: Critical > > The amazon client do not downgrade from CBOR to JSON while setting env > AWS_CBOR_DISABLE to true (or 1) and/or defining > com.amazonaws.sdk.disableCbor=true via JVM options. This bug is due to maven > shade relocation of com.amazon.* classes. As soon as you cancel this > relocation (by removing the relocation in the kinesis connector or by > re-relocating in the final jar), it reruns again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10265) Configure checkpointing behavior for SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647218#comment-16647218 ] sunjincheng commented on FLINK-10265: - Hi,[~twalthr] [~yanghua] I agree that we should unify the batch and streaming. BTW we should add the checkpoint timeout attribute configuration as well. something like : {code:java} ... checkpointing: timeout: 60 ...{code} > Configure checkpointing behavior for SQL Client > --- > > Key: FLINK-10265 > URL: https://issues.apache.org/jira/browse/FLINK-10265 > Project: Flink > Issue Type: New Feature > Components: SQL Client >Reporter: Timo Walther >Assignee: vinoyang >Priority: Major > > The SQL Client environment file should expose checkpointing related > properties: > - enable checkpointing > - checkpointing interval > - mode > - timeout > - etc. see {{org.apache.flink.streaming.api.environment.CheckpointConfig}} > Per-job selection of state backends and their configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647203#comment-16647203 ] ASF GitHub Bot commented on FLINK-10516: suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#discussion_r224639273 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java ## @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { taskManagerConf, workingDirectory, taskManagerMainClass, LOG); assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); } + + @Test + public void testRunAndInitializeFileSystem() throws Exception { + // Mock necessary system variables + Map map = new HashMap(System.getenv()); + map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo"); + // Create dynamic properties to be used in the Flink configuration + map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue"); + CommonTestUtils.setEnv(map); + + // Create a temporary flink-conf.yaml and to be deleted on JVM exits + File currDir = new File(System.getenv().get(ApplicationConstants.Environment.PWD.key())); + String path = String.format("%s/%s.%s", currDir, "flink-conf", "yaml"); + File f = new File(path); + f.createNewFile(); + f.deleteOnExit(); + + // Mock FileSystem.initialize() + PowerMockito.mockStatic(FileSystem.class); + PowerMockito.doNothing().when(FileSystem.class); + FileSystem.initialize(any(Configuration.class)); + + String[] args = new String[5]; + YarnApplicationMasterRunner yarnApplicationMasterRunner = new YarnApplicationMasterRunner(); + yarnApplicationMasterRunner.run(args); Review comment: I think we should refactor the code like YarnTaskManagerRunnerFactoryTest. Running the entire appMaster code in unittest is too much. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#discussion_r224639273 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java ## @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { taskManagerConf, workingDirectory, taskManagerMainClass, LOG); assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); } + + @Test + public void testRunAndInitializeFileSystem() throws Exception { + // Mock necessary system variables + Map map = new HashMap(System.getenv()); + map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo"); + // Create dynamic properties to be used in the Flink configuration + map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue"); + CommonTestUtils.setEnv(map); + + // Create a temporary flink-conf.yaml and to be deleted on JVM exits + File currDir = new File(System.getenv().get(ApplicationConstants.Environment.PWD.key())); + String path = String.format("%s/%s.%s", currDir, "flink-conf", "yaml"); + File f = new File(path); + f.createNewFile(); + f.deleteOnExit(); + + // Mock FileSystem.initialize() + PowerMockito.mockStatic(FileSystem.class); + PowerMockito.doNothing().when(FileSystem.class); + FileSystem.initialize(any(Configuration.class)); + + String[] args = new String[5]; + YarnApplicationMasterRunner yarnApplicationMasterRunner = new YarnApplicationMasterRunner(); + yarnApplicationMasterRunner.run(args); Review comment: I think we should refactor the code like YarnTaskManagerRunnerFactoryTest. Running the entire appMaster code in unittest is too much. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647200#comment-16647200 ] ASF GitHub Bot commented on FLINK-10516: suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#discussion_r224638155 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java ## @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { taskManagerConf, workingDirectory, taskManagerMainClass, LOG); assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); } + + @Test + public void testRunAndInitializeFileSystem() throws Exception { + // Mock necessary system variables + Map map = new HashMap(System.getenv()); + map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo"); + // Create dynamic properties to be used in the Flink configuration + map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue"); + CommonTestUtils.setEnv(map); + + // Create a temporary flink-conf.yaml and to be deleted on JVM exits + File currDir = new File(System.getenv().get(ApplicationConstants.Environment.PWD.key())); Review comment: There is already a flink-conf.yaml in the test resource directory. You can reuse it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
suez1224 commented on a change in pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#discussion_r224638155 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java ## @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { taskManagerConf, workingDirectory, taskManagerMainClass, LOG); assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); } + + @Test + public void testRunAndInitializeFileSystem() throws Exception { + // Mock necessary system variables + Map map = new HashMap(System.getenv()); + map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo"); + // Create dynamic properties to be used in the Flink configuration + map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue"); + CommonTestUtils.setEnv(map); + + // Create a temporary flink-conf.yaml and to be deleted on JVM exits + File currDir = new File(System.getenv().get(ApplicationConstants.Environment.PWD.key())); Review comment: There is already a flink-conf.yaml in the test resource directory. You can reuse it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647196#comment-16647196 ] ASF GitHub Bot commented on FLINK-10516: suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#issuecomment-429158706 Can you write the fix against the current master? Also, please fix the test failures. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#issuecomment-429158706 Can you write the fix against the current master? Also, please fix the test failures. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()
[ https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647154#comment-16647154 ] ASF GitHub Bot commented on FLINK-5465: --- zorro786 commented on issue #5058: [FLINK-5465] [streaming] Wait for pending timer threads to finish or … URL: https://github.com/apache/flink/pull/5058#issuecomment-429150903 @StefanRRichter It is mentioned in Jira ticket that this affects versions 1.5.0, 1.4.2, 1.6.0. Could you please tell why it doesn't affect 1.4.0? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RocksDB fails with segfault while calling AbstractRocksDBState.clear() > -- > > Key: FLINK-5465 > URL: https://issues.apache.org/jira/browse/FLINK-5465 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Stefan Richter >Priority: Major > Fix For: 1.4.0, 1.5.0 > > Attachments: hs-err-pid26662.log > > > I'm using Flink 699f4b0. > {code} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576 > # > # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build > 1.7.0_67-b01) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode > linux-amd64 compressed oops) > # Problematic frame: > # C [librocksdbjni-linux64.so+0x1aeb78] > rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8 > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # > /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log > Compiled method (nm) 1869778 903 n org.rocksdb.RocksDB::remove > (native) > total in heap [0x7f91b40b9dd0,0x7f91b40ba150] = 896 > relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88 > main code [0x7f91b40b9f60,0x7f91b40ba150] = 496 > # > # If you would like to submit a bug report, please visit: > # http://bugreport.sun.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zorro786 commented on issue #5058: [FLINK-5465] [streaming] Wait for pending timer threads to finish or …
zorro786 commented on issue #5058: [FLINK-5465] [streaming] Wait for pending timer threads to finish or … URL: https://github.com/apache/flink/pull/5058#issuecomment-429150903 @StefanRRichter It is mentioned in Jira ticket that this affects versions 1.5.0, 1.4.2, 1.6.0. Could you please tell why it doesn't affect 1.4.0? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10494) Rename 'JobManager' to 'JobMaster' for some classes in JobMaster folder
[ https://issues.apache.org/jira/browse/FLINK-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JIN SUN closed FLINK-10494. --- Resolution: Won't Fix closed as suggestion > Rename 'JobManager' to 'JobMaster' for some classes in JobMaster folder > --- > > Key: FLINK-10494 > URL: https://issues.apache.org/jira/browse/FLINK-10494 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.0, 1.7.0 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Some names in > "flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster" folder are > confusing, we should rename it to JobMaster. > > * JobManagerRunner -> JobMasterRunner > * JobManagerGateway -> JobMasterGateway > * JobManagerSharedServices -> JobMasterSharedServices > * JobManagerException -> JobMasterException -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8424) [Cassandra Connector] Update Cassandra version to last one
[ https://issues.apache.org/jira/browse/FLINK-8424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647005#comment-16647005 ] ASF GitHub Bot commented on FLINK-8424: --- bmeriaux commented on issue #6715: [FLINK-8424] update cassandra and driver version to latest URL: https://github.com/apache/flink/pull/6715#issuecomment-429109568 ok thanks for info on the policy of versioning, so will this be merged into master ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > [Cassandra Connector] Update Cassandra version to last one > -- > > Key: FLINK-8424 > URL: https://issues.apache.org/jira/browse/FLINK-8424 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Joao Boto >Priority: Major > Labels: pull-request-available > > Cassandra connector are using a version outdated > This is to upgrade the cassandra version to something new > https://git1-us-west.apache.org/repos/asf?p=cassandra.git;a=blob_plain;f=CHANGES.txt;hb=refs/tags/cassandra-3.11.1 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] bmeriaux commented on issue #6715: [FLINK-8424] update cassandra and driver version to latest
bmeriaux commented on issue #6715: [FLINK-8424] update cassandra and driver version to latest URL: https://github.com/apache/flink/pull/6715#issuecomment-429109568 ok thanks for info on the policy of versioning, so will this be merged into master ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10532) Broken links in documentation
Chesnay Schepler created FLINK-10532: Summary: Broken links in documentation Key: FLINK-10532 URL: https://issues.apache.org/jira/browse/FLINK-10532 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.7.0 Reporter: Chesnay Schepler Fix For: 1.7.0 https://travis-ci.org/apache/flink/builds/440115490#L599 {code:java} http://localhost:4000/dev/stream/operators.html: Remote file does not exist -- broken link!!! -- http://localhost:4000/dev/table/streaming/sql.html: Remote file does not exist -- broken link!!! http://localhost:4000/dev/table/streaming.html: Remote file does not exist -- broken link!!!{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9943) Support TaskManagerMetricQueryServicePaths msg in JobManager Actor
[ https://issues.apache.org/jira/browse/FLINK-9943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646923#comment-16646923 ] ASF GitHub Bot commented on FLINK-9943: --- isunjin commented on issue #6429: [FLINK-9943] Support TaskManagerMetricQueryServicePaths msg in JobManager Actor URL: https://github.com/apache/flink/pull/6429#issuecomment-429081228 could you also close the JIRA? @chuanlei This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support TaskManagerMetricQueryServicePaths msg in JobManager Actor > -- > > Key: FLINK-9943 > URL: https://issues.apache.org/jira/browse/FLINK-9943 > Project: Flink > Issue Type: New Feature > Components: Core >Affects Versions: 1.5.0, 1.5.1 >Reporter: Chuanlei Ni >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The reasons are as follows > # AkkaJobManagerGateway wraps jm actor ref to support such functionality by > request RegisteredTaskManagers firstly and request task manager actor to get > metric query service path one by one. the procedure above is resource-wasted. > It will be more efficient if we support this functionality in the jm actor > # we can expose flink metric system directly to external system (such as > flink client and the like) to support more features in future. For now, > metric system has been exposed partially because Instance can not (and should > not) be transfered remotely. This feature will make metrics exposure > consistent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin commented on issue #6429: [FLINK-9943] Support TaskManagerMetricQueryServicePaths msg in JobManager Actor
isunjin commented on issue #6429: [FLINK-9943] Support TaskManagerMetricQueryServicePaths msg in JobManager Actor URL: https://github.com/apache/flink/pull/6429#issuecomment-429081228 could you also close the JIRA? @chuanlei This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10516: --- Labels: pull-request-available (was: ) > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646859#comment-16646859 ] ASF GitHub Bot commented on FLINK-10516: yanyan300300 opened a new pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828 ## What is the purpose of the change This pull request makes YarnApplicationMasterRunner explicitly initialize FileSystem with Flink Configuration, so that the HadoopFileSystem can take in the custom HDFS configuration (e.g. core-site.xml under fs.hdfs.hadoopconf) for Flink 1.4 ## Verifying this change This change added tests and can be verified as follows: - Added a unit tests to verify the FileSystem.initialize() is called with correct Flink Config when YarnApplicationMasterRunner.run() is called ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanyan300300 opened a new pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
yanyan300300 opened a new pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828 ## What is the purpose of the change This pull request makes YarnApplicationMasterRunner explicitly initialize FileSystem with Flink Configuration, so that the HadoopFileSystem can take in the custom HDFS configuration (e.g. core-site.xml under fs.hdfs.hadoopconf) for Flink 1.4 ## Verifying this change This change added tests and can be verified as follows: - Added a unit tests to verify the FileSystem.initialize() is called with correct Flink Config when YarnApplicationMasterRunner.run() is called ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646800#comment-16646800 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224531500 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646802#comment-16646802 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224529147 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -606,6 +627,17 @@ private RocksDB openDB( Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); + if (this.metricOptions.isEnabled()) { + this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( + dbRef, + rocksDBResourceGuard, + metricOptions, + operatorMetricGroup + ); + + this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor); Review comment: I think this is not a good way for a clean shutdown. Instead I would suggest to close this in the `dispose()` method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Forward RocksDB native metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646801#comment-16646801 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224534588 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646803#comment-16646803 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224533004 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; Review comment: I think a lease is not required if this object is closed in the beginning `RocksDBKeyedStateBackend.dispose()`. The lease is rather required for parallel threads that use the db and we don't know when they are done using the db in `dispose()`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Forward RocksDB native metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646804#comment-16646804 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224536940 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1400,7 +1433,8 @@ private ColumnFamilyHandle createColumnFamily(String stateName) { ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); try { - return db.createColumnFamily(columnDescriptor); + ColumnFamilyHandle handle = db.createColumnFamily(columnDescriptor); + return handle; Review comment: Unrelated and could be reverted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Forward RocksDB native metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224531500 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224533004 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; Review comment: I think a lease is not required if this object is closed in the beginning `RocksDBKeyedStateBackend.dispose()`. The lease is rather required for parallel threads that use the db and we don't know when they are done using the db in `dispose()`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224536940 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1400,7 +1433,8 @@ private ColumnFamilyHandle createColumnFamily(String stateName) { ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); try { - return db.createColumnFamily(columnDescriptor); + ColumnFamilyHandle handle = db.createColumnFamily(columnDescriptor); + return handle; Review comment: Unrelated and could be reverted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224534588 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224529147 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -606,6 +627,17 @@ private RocksDB openDB( Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); + if (this.metricOptions.isEnabled()) { + this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( + dbRef, + rocksDBResourceGuard, + metricOptions, + operatorMetricGroup + ); + + this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor); Review comment: I think this is not a good way for a clean shutdown. Instead I would suggest to close this in the `dispose()` method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10531) State TTL RocksDb backend end-to-end test end-to-end test failed on Travis
Till Rohrmann created FLINK-10531: - Summary: State TTL RocksDb backend end-to-end test end-to-end test failed on Travis Key: FLINK-10531 URL: https://issues.apache.org/jira/browse/FLINK-10531 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.6.1 Reporter: Till Rohrmann The {{State TTL RocksDb backend end-to-end test}} end-to-end test failed on Travis. https://travis-ci.org/apache/flink/jobs/438226190 https://api.travis-ci.org/v3/job/438226190/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6825: [hotfix] Fix typo in WindowedStream.
asfgit closed pull request #6825: [hotfix] Fix typo in WindowedStream. URL: https://github.com/apache/flink/pull/6825 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 871d86fc228..4f8243ccc3a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -1011,7 +1011,7 @@ public WindowedStream(KeyedStream input, * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. * -* Not that this function requires that all data in the windows is buffered until the window +* Note that this function requires that all data in the windows is buffered until the window * is evaluated, as the function provides no means of incremental aggregation. * * @param function The window function. @@ -1045,7 +1045,7 @@ public WindowedStream(KeyedStream input, * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. * -* Not that this function requires that all data in the windows is buffered until the window +* Note that this function requires that all data in the windows is buffered until the window * is evaluated, as the function provides no means of incremental aggregation. * * @param function The window function. @@ -1063,7 +1063,7 @@ public WindowedStream(KeyedStream input, * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. * -* Not that this function requires that all data in the windows is buffered until the window +* Note that this function requires that all data in the windows is buffered until the window * is evaluated, as the function provides no means of incremental aggregation. * * @param function The window function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on issue #6825: [hotfix] Fix typo in WindowedStream.
StefanRRichter commented on issue #6825: [hotfix] Fix typo in WindowedStream. URL: https://github.com/apache/flink/pull/6825#issuecomment-429019467 LGTM 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646693#comment-16646693 ] Kostas Kloudas commented on FLINK-9592: --- Hi [~kent2171]. Sorry for not posting the design doc earlier. That was a big mistake on my side. Currently we have a new filesystem sink, called `StreamingFileSink`. You can find it on the master branch. I will have a look at the PR just to get an idea of the proposed solution but please open a discussion in the dev mailing list so that other members of the community can comment on it. In the discussion you can include the PR and why you think that this feature is interesting and how you implemented it. This is the only way that this PR can be merged, as described in the contribution guidelines https://flink.apache.org/how-to-contribute.html. In addition you can follow the discussion about the contributions in the dev ML (search for [DISCUSS] [Contributing]). > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code Bucketing
[jira] [Commented] (FLINK-6625) Flink removes HA job data when reaching JobStatus.FAILED
[ https://issues.apache.org/jira/browse/FLINK-6625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646645#comment-16646645 ] Ufuk Celebi commented on FLINK-6625: {quote}Can't a failed checkpoint (unable to commit something, write somewhere, etc.) fail the job? Such an incomplete checkpoint would make HA-recovery impossible.{quote} I think not. The scenario you describe would result in a {{FAILED}} checkpoint and the previous checkpoint would be the latest one in the HA store. > Flink removes HA job data when reaching JobStatus.FAILED > > > Key: FLINK-6625 > URL: https://issues.apache.org/jira/browse/FLINK-6625 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.3.0, 1.4.0 >Reporter: Till Rohrmann >Priority: Major > > Currently, Flink removes all job related data (submitted {{JobGraph}} as well > as checkpoints) when it reaches a globally terminal state (including > {{JobStatus.FAILED}}). In high availability mode, this entails that all data > is removed from ZooKeeper and there is no way to recover the job by > restarting the cluster with the same cluster id. > I think this is problematic, since an application might just have failed > because it has depleted its numbers of restart attempts. Also the last > checkpoint information could be helpful when trying to find out why the job > has actually failed. I propose that we only remove job data when reaching the > state {{JobStatus.SUCCESS}} or {{JobStatus.CANCELED}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10529) Add flink-s3-fs-base to the connectors in the travis stage file.
[ https://issues.apache.org/jira/browse/FLINK-10529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-10529. -- Resolution: Fixed Merged on master with 86f88f71253c8fc916de0dae1c0621072b0321c0. > Add flink-s3-fs-base to the connectors in the travis stage file. > > > Key: FLINK-10529 > URL: https://issues.apache.org/jira/browse/FLINK-10529 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-9752. - Resolution: Fixed Merged on master with 88ea996dda16e340488a2d9f7516bf8337edefdd. > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9715) Support versioned joins with event time
[ https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646603#comment-16646603 ] ASF GitHub Bot commented on FLINK-9715: --- hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time URL: https://github.com/apache/flink/pull/6776#discussion_r224492754 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala ## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} +import java.util +import java.util.Comparator + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators._ +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, TableException} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ + +/** + * This operator works by keeping on the state collection of probe and build records to process + * on next watermark. The idea is that between watermarks we are collecting those elements + * and once we are sure that there will be no updates we emit the correct result and clean up the + * state. + * + * Cleaning up the state drops all of the "old" values from the probe side, where "old" is defined + * as older then the current watermark. Build side is also cleaned up in the similar fashion, + * however we always keep at least one record - the latest one - even if it's past the last + * watermark. + * + * One more trick is how the emitting results and cleaning up is triggered. It is achieved + * by registering timers for the keys. We could register a timer for every probe and build + * side element's event time (when watermark exceeds this timer, that's when we are emitting and/or + * cleaning up the state). However this would cause huge number of registered timers. For example + * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if we + * had received Watermark(10), it would trigger 5 separate timers for the same key. To avoid that + * we always keep only one single registered timer for any given key, registered for the minimal + * value. Upon triggering it, we process all records with event times older then or equal to + * currentWatermark. + */ +class TemporalRowtimeJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig, +leftTimeAttribute: Int, +rightTimeAttribute: Int) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + private val NEXT_LEFT_INDEX_STATE_NAME = "next-index" + private val LEFT_STATE_NAME = "left" + private val RIGHT_STATE_NAME = "right" + private val REGISTERED_TIMER_STATE_NAME = "timer" + private val TIMERS_STATE_NAME = "timers" + + private val rightRowtimeComparator = new RowtimeComparator(rightTimeAttribute) + + /** +* Incremental index generator for `leftState`'s keys. +*/ + private var nextLeftIndex: ValueState[JLong] = _ + + /** +* Mapping from artificial row index (generated by `nextLeftIndex`) into the left side `Row`. +* We can not use List to accumulate Row
[GitHub] hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time
hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time URL: https://github.com/apache/flink/pull/6776#discussion_r224492754 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala ## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} +import java.util +import java.util.Comparator + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators._ +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, TableException} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ + +/** + * This operator works by keeping on the state collection of probe and build records to process + * on next watermark. The idea is that between watermarks we are collecting those elements + * and once we are sure that there will be no updates we emit the correct result and clean up the + * state. + * + * Cleaning up the state drops all of the "old" values from the probe side, where "old" is defined + * as older then the current watermark. Build side is also cleaned up in the similar fashion, + * however we always keep at least one record - the latest one - even if it's past the last + * watermark. + * + * One more trick is how the emitting results and cleaning up is triggered. It is achieved + * by registering timers for the keys. We could register a timer for every probe and build + * side element's event time (when watermark exceeds this timer, that's when we are emitting and/or + * cleaning up the state). However this would cause huge number of registered timers. For example + * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if we + * had received Watermark(10), it would trigger 5 separate timers for the same key. To avoid that + * we always keep only one single registered timer for any given key, registered for the minimal + * value. Upon triggering it, we process all records with event times older then or equal to + * currentWatermark. + */ +class TemporalRowtimeJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig, +leftTimeAttribute: Int, +rightTimeAttribute: Int) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + private val NEXT_LEFT_INDEX_STATE_NAME = "next-index" + private val LEFT_STATE_NAME = "left" + private val RIGHT_STATE_NAME = "right" + private val REGISTERED_TIMER_STATE_NAME = "timer" + private val TIMERS_STATE_NAME = "timers" + + private val rightRowtimeComparator = new RowtimeComparator(rightTimeAttribute) + + /** +* Incremental index generator for `leftState`'s keys. +*/ + private var nextLeftIndex: ValueState[JLong] = _ + + /** +* Mapping from artificial row index (generated by `nextLeftIndex`) into the left side `Row`. +* We can not use List to accumulate Rows, because we need efficient deletes of the oldest rows. +* +* TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's timestamp, to avoid +* full map traversals (if we have lots of rows on the state that exceed `currentWatermark`).
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646580#comment-16646580 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224486164 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnFailure() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.completeExceptionally(new RuntimeException()); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + } + + @Test(timeout = 5000) + public void testTimeoutExceptionOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(0, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null))); + try { + casSinkFunc.invoke("hello"); + Assert.fail("Sending value should have experienced a TimeoutException"); + } catc
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646574#comment-16646574 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224475206 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ## @@ -354,6 +359,23 @@ protected Cluster buildCluster(Cluster.Builder builder) { return this; } + /** +* Sets the maximum allowed number of concurrent requests for this sink. +* +* This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is called. +* +* @param maxConcurrentRequests maximum number of concurrent requests allowed +* @param timeout timeout duration when acquiring a permit to execute +* @param unit timeout unit when acquiring a permit to execute +* @return this builder +*/ + public CassandraSinkBuilder setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit) { Review comment: I would also add a short hand with default max timeout basically blocking the task thread: `setMaxConcurrentRequests(int maxConcurrentRequests);` In general, I think users would be more interested in blocking indefinitely case which just propagates the back pressure upstream and it can be also seen in UI. As I understand, now throwing TimeoutException will fail and job, it will be restarted, probably fail with timeout again and so on. Then it is better just to block and avoid restarts. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646578#comment-16646578 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224487663 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); Review comment: The `CompletableFuture`'s could be added to the list inside `invoke`, queried from `TestCassandraSink` and completed later in any order. This way we could check multiple pending invokes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646581#comment-16646581 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224475996 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; Review comment: maybe minor thing, I would rather say that these default constants belong to the builder where they are actually used and can be documentation for themselves. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646576#comment-16646576 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224477067 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; Review comment: These parameters can be final actually because they are set only during construction. If they would explode constructor too much, I would rather create a separate immutable config class for them and all other possible sink base options internally. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646583#comment-16646583 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224481690 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --- Cassandra Fields --- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // Synchronization Fields + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // - Sink Methods - + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* +* A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. +* +* This Phaser is configured to support "1 + N" parties. +* - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. +* - "N" for the varying number of invoke() calls that register and de-register with the Phaser. +* +* The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. +* This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits +* are being released during a flush() call. +*/ + phaser = new Phaser(1) { Review comment: Phaser solution looks good. I am wondering whether we need such low level approach. `send` and `snapshotState` are called synchronously. We could have a concurrent set of futures and use `FutureUtil.waitForAll` to sync in flush. This looks simpler to me. Is there a particular performance reason to use the `Pha
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646582#comment-16646582 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224486780 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { Review comment: Is not this case already covered with the following 2 tests in the beginning? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646584#comment-16646584 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224484698 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -127,35 +144,112 @@ public void close() throws Exception { } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } + public void initializeState(FunctionInitializationContext context) throws Exception {} @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { checkAsyncErrors(); - waitForPendingUpdates(); + flush(); checkAsyncErrors(); } - private void waitForPendingUpdates() throws InterruptedException { - synchronized (updatesPending) { - while (updatesPending.get() > 0) { - updatesPending.wait(); + @Override + public void invoke(IN value) throws Exception { + checkAsyncErrors(); + tryAcquire(); + final ListenableFuture result = send(value); + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(V ignored) { + release(); } + + @Override + public void onFailure(Throwable currentError) { + setAsyncErrors(currentError); + release(); + } + }); + } + + // --- User-Defined Sink Methods -- + + public abstract ListenableFuture send(IN value); + + // - Configuration Methods + + /** +* Sets the maximum allowed number of concurrent requests for this sink. +* +* @param maxConcurrentRequests maximum number of concurrent requests allowed +* @param timeout timeout duration when acquiring a permit to execute +* @param unit timeout unit when acquiring a permit to execute +*/ + public void setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit) { + Preconditions.checkArgument(maxConcurrentRequests >= 0, "maxConcurrentRequests cannot be negative."); + Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative."); + this.maxConcurrentRequests = maxConcurrentRequests; + this.maxConcurrentRequestsTimeout = timeout; + this.maxConcurrentRequestsTimeoutUnit = unit; + } + + // --- Cassandra Methods -- + + protected Cluster createCluster() { Review comment: can be private atm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646577#comment-16646577 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224484466 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -127,35 +144,112 @@ public void close() throws Exception { } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } + public void initializeState(FunctionInitializationContext context) throws Exception {} Review comment: `throws Exception` can be removed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646573#comment-16646573 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224473345 ## File path: docs/dev/connectors/cassandra.md ## @@ -72,10 +72,13 @@ The following configuration methods can be used: 4. _setMapperOptions(MapperOptions options)_ * Sets the mapper options that are used to configure the DataStax ObjectMapper. * Only applies when processing __POJO__ data types. -5. _enableWriteAheadLog([CheckpointCommitter committer])_ +5. _setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit)_ +* Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute. +* Only applies when __enableWriteAheadLog()__ is not configured. Review comment: Potentially `CassandraTupleWriteAheadSink.sendValues` could be also throttled and send values in batches of concurrent requests instead of trying to flush at once all values accumulated between checkpoints. It could be useful the same way. Is it just matter of implementing effort or there are more concerns about this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646579#comment-16646579 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224485975 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnFailure() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.completeExceptionally(new RuntimeException()); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + } Review comment: also `casSinkFunc.close();`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Impr
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646575#comment-16646575 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224486540 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnFailure() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.completeExceptionally(new RuntimeException()); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + } + + @Test(timeout = 5000) + public void testTimeoutExceptionOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(0, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null))); + try { + casSinkFunc.invoke("hello"); + Assert.fail("Sending value should have experienced a TimeoutException"); + } catc
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224486780 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { Review comment: Is not this case already covered with the following 2 tests in the beginning? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224481690 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --- Cassandra Fields --- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // Synchronization Fields + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // - Sink Methods - + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* +* A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. +* +* This Phaser is configured to support "1 + N" parties. +* - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. +* - "N" for the varying number of invoke() calls that register and de-register with the Phaser. +* +* The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. +* This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits +* are being released during a flush() call. +*/ + phaser = new Phaser(1) { Review comment: Phaser solution looks good. I am wondering whether we need such low level approach. `send` and `snapshotState` are called synchronously. We could have a concurrent set of futures and use `FutureUtil.waitForAll` to sync in flush. This looks simpler to me. Is there a particular performance reason to use the `Phaser`? I would also abstract semaphore/Phaser/futures/addFuture/waitForAll/error away into another reusable and pluggable component. It can be also tested separately. This is an autom
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224477067 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; Review comment: These parameters can be final actually because they are set only during construction. If they would explode constructor too much, I would rather create a separate immutable config class for them and all other possible sink base options internally. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224484698 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -127,35 +144,112 @@ public void close() throws Exception { } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } + public void initializeState(FunctionInitializationContext context) throws Exception {} @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { checkAsyncErrors(); - waitForPendingUpdates(); + flush(); checkAsyncErrors(); } - private void waitForPendingUpdates() throws InterruptedException { - synchronized (updatesPending) { - while (updatesPending.get() > 0) { - updatesPending.wait(); + @Override + public void invoke(IN value) throws Exception { + checkAsyncErrors(); + tryAcquire(); + final ListenableFuture result = send(value); + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(V ignored) { + release(); } + + @Override + public void onFailure(Throwable currentError) { + setAsyncErrors(currentError); + release(); + } + }); + } + + // --- User-Defined Sink Methods -- + + public abstract ListenableFuture send(IN value); + + // - Configuration Methods + + /** +* Sets the maximum allowed number of concurrent requests for this sink. +* +* @param maxConcurrentRequests maximum number of concurrent requests allowed +* @param timeout timeout duration when acquiring a permit to execute +* @param unit timeout unit when acquiring a permit to execute +*/ + public void setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit) { + Preconditions.checkArgument(maxConcurrentRequests >= 0, "maxConcurrentRequests cannot be negative."); + Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative."); + this.maxConcurrentRequests = maxConcurrentRequests; + this.maxConcurrentRequestsTimeout = timeout; + this.maxConcurrentRequestsTimeoutUnit = unit; + } + + // --- Cassandra Methods -- + + protected Cluster createCluster() { Review comment: can be private atm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224485975 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnFailure() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.completeExceptionally(new RuntimeException()); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + } Review comment: also `casSinkFunc.close();`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224475206 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ## @@ -354,6 +359,23 @@ protected Cluster buildCluster(Cluster.Builder builder) { return this; } + /** +* Sets the maximum allowed number of concurrent requests for this sink. +* +* This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is called. +* +* @param maxConcurrentRequests maximum number of concurrent requests allowed +* @param timeout timeout duration when acquiring a permit to execute +* @param unit timeout unit when acquiring a permit to execute +* @return this builder +*/ + public CassandraSinkBuilder setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit) { Review comment: I would also add a short hand with default max timeout basically blocking the task thread: `setMaxConcurrentRequests(int maxConcurrentRequests);` In general, I think users would be more interested in blocking indefinitely case which just propagates the back pressure upstream and it can be also seen in UI. As I understand, now throwing TimeoutException will fail and job, it will be restarted, probably fail with timeout again and so on. Then it is better just to block and avoid restarts. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224473345 ## File path: docs/dev/connectors/cassandra.md ## @@ -72,10 +72,13 @@ The following configuration methods can be used: 4. _setMapperOptions(MapperOptions options)_ * Sets the mapper options that are used to configure the DataStax ObjectMapper. * Only applies when processing __POJO__ data types. -5. _enableWriteAheadLog([CheckpointCommitter committer])_ +5. _setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit)_ +* Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute. +* Only applies when __enableWriteAheadLog()__ is not configured. Review comment: Potentially `CassandraTupleWriteAheadSink.sendValues` could be also throttled and send values in batches of concurrent requests instead of trying to flush at once all values accumulated between checkpoints. It could be useful the same way. Is it just matter of implementing effort or there are more concerns about this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224484466 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -127,35 +144,112 @@ public void close() throws Exception { } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } + public void initializeState(FunctionInitializationContext context) throws Exception {} Review comment: `throws Exception` can be removed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224486164 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnFailure() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.completeExceptionally(new RuntimeException()); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + } + + @Test(timeout = 5000) + public void testTimeoutExceptionOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(0, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null))); + try { + casSinkFunc.invoke("hello"); + Assert.fail("Sending value should have experienced a TimeoutException"); + } catch (Exception e) { + Assert.assertTrue(e instanceof TimeoutException); + } + } Review comment: also casSinkFunc.close();? This is an automated m
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224475996 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; Review comment: maybe minor thing, I would rather say that these default constants belong to the builder where they are actually used and can be documentation for themselves. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224486540 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnFailure() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.completeExceptionally(new RuntimeException()); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + } + + @Test(timeout = 5000) + public void testTimeoutExceptionOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(0, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null))); + try { + casSinkFunc.invoke("hello"); + Assert.fail("Sending value should have experienced a TimeoutException"); + } catch (Exception e) { + Assert.assertTrue(e instanceof TimeoutException); + } + } + + private static class TestCassandraSink extends CassandraSinkBase { Review comment: can be `AutoClosable` to use with try
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r224487663 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ## @@ -204,23 +206,113 @@ public void go() throws Exception { Thread.sleep(5); } - Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); completableFuture.complete(null); - Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); } - private static class TestCassandraSink extends CassandraSinkBase { + @Test(timeout = 5000) + public void testAcquireOnInvoke() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); + + Assert.assertEquals(0, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(1, casSinkFunc.getAcquiredPermits()); + + completableFuture.complete(null); + + casSinkFunc.close(); + } + + @Test(timeout = 5000) + public void testReleaseOnSuccess() throws Exception { + TestCassandraSink casSinkFunc = new TestCassandraSink(); + casSinkFunc.setMaxConcurrentRequests(1, 5, TimeUnit.MILLISECONDS); + + casSinkFunc.open(new Configuration()); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + casSinkFunc.setResultSetFuture(ResultSetFutures.fromCompletableFuture(completableFuture)); + + Assert.assertEquals(1, casSinkFunc.getAvailablePermits()); + Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); + + casSinkFunc.invoke("hello"); Review comment: The `CompletableFuture`'s could be added to the list inside `invoke`, queried from `TestCassandraSink` and completed later in any order. This way we could check multiple pending invokes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-9788) ExecutionGraph Inconsistency prevents Job from recovering
[ https://issues.apache.org/jira/browse/FLINK-9788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9788. -- Resolution: Fixed Fixed via 1.7.0: https://github.com/apache/flink/commit/5ae68f85fa4182d94608583b133e8162c4f8f07d 1.6.2: https://github.com/apache/flink/commit/15a90891a840aff53260a8819e1120cd310f7336 1.5.5: https://github.com/apache/flink/commit/d13015e23c805e1aaefdc7d8037f2fa87ea74830 > ExecutionGraph Inconsistency prevents Job from recovering > - > > Key: FLINK-9788 > URL: https://issues.apache.org/jira/browse/FLINK-9788 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.0 > Environment: Rev: 4a06160 > Hadoop 2.8.3 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > Attachments: jobmanager_5000.log > > > Deployment mode: YARN job mode with HA > After killing many TaskManagers in succession, the state of the > ExecutionGraph ran into an inconsistent state, which prevented job recovery. > The following stacktrace was logged in the JobManager log several hundred > times per second: > {noformat} > -08 16:47:18,855 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph >- Job General purpose test job (37a794195840700b98feb23e99f7ea24) > switched from state RESTARTING to RESTARTING. > 2018-07-08 16:47:18,856 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting > the job General purpose test job (37a794195840700b98feb23e99f7ea24). > 2018-07-08 16:47:18,857 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph- Resetting > execution vertex Source: Custom Source -> Timestamps/Watermarks (1/10) for > new execution. > 2018-07-08 16:47:18,857 WARN > org.apache.flink.runtime.executiongraph.ExecutionGraph- Failed to > restart the job. > java.lang.IllegalStateException: Cannot reset a vertex that is in > non-terminal state CREATED > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.resetForNewExecution(ExecutionVertex.java:610) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:573) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1251) > at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) > at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > The resulting jobmanager log file was 4.7 GB in size. Find attached the first > 5000 lines of the log file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #6812: [BP-1.5][FLINK-9788] Fix ExecutionGraph inconsistency for global failures when restarting
tillrohrmann closed pull request #6812: [BP-1.5][FLINK-9788] Fix ExecutionGraph inconsistency for global failures when restarting URL: https://github.com/apache/flink/pull/6812 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index acb1e16fe71..0be1ff27420 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) { current == JobStatus.SUSPENDED || current.isGloballyTerminalState()) { return; - } - else if (current == JobStatus.RESTARTING) { - // we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something - // has gone wrong in 'RESTARTING' and we need to re-attempt the restarts - initFailureCause(t); - - final long globalVersionForRestart = incrementGlobalModVersion(); - if (tryRestartOrFail(globalVersionForRestart)) { - return; - } - } - else if (transitionState(current, JobStatus.FAILING, t)) { + } else if (transitionState(current, JobStatus.FAILING, t)) { initFailureCause(t); // make sure no concurrent local or global actions interfere with the failover @@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) { colGroups.add(cgroup); } - jv.resetForNewExecution(resetTimestamp, globalModVersion); + jv.resetForNewExecution(resetTimestamp, expectedGlobalVersion); } for (int i = 0; i < stateTimestamps.length; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 9b98de78143..91510d1af54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -57,12 +58,15 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Test; +import javax.annotation.Nonnull; + import java.io.IOException; import java.net.InetAddress; import java.util.Iterator; @@ -86,15 +90,20 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.spy; +/** + * Tests the restart behaviour of the {@link ExecutionGraph}. + */ public class ExecutionGraphRestartTest extends TestLogger { - private final static int NUM_TASKS = 31; + private static final int NUM_TASKS = 31; private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); @@ -113,9 +122
[GitHub] tillrohrmann closed pull request #6811: [BP-1.6][FLINK-9788] Fix ExecutionGraph inconsistency for global failures when restarting
tillrohrmann closed pull request #6811: [BP-1.6][FLINK-9788] Fix ExecutionGraph inconsistency for global failures when restarting URL: https://github.com/apache/flink/pull/6811 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index acb1e16fe71..0be1ff27420 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) { current == JobStatus.SUSPENDED || current.isGloballyTerminalState()) { return; - } - else if (current == JobStatus.RESTARTING) { - // we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something - // has gone wrong in 'RESTARTING' and we need to re-attempt the restarts - initFailureCause(t); - - final long globalVersionForRestart = incrementGlobalModVersion(); - if (tryRestartOrFail(globalVersionForRestart)) { - return; - } - } - else if (transitionState(current, JobStatus.FAILING, t)) { + } else if (transitionState(current, JobStatus.FAILING, t)) { initFailureCause(t); // make sure no concurrent local or global actions interfere with the failover @@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) { colGroups.add(cgroup); } - jv.resetForNewExecution(resetTimestamp, globalModVersion); + jv.resetForNewExecution(resetTimestamp, expectedGlobalVersion); } for (int i = 0; i < stateTimestamps.length; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 9b98de78143..91510d1af54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -57,12 +58,15 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Test; +import javax.annotation.Nonnull; + import java.io.IOException; import java.net.InetAddress; import java.util.Iterator; @@ -86,15 +90,20 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.spy; +/** + * Tests the restart behaviour of the {@link ExecutionGraph}. + */ public class ExecutionGraphRestartTest extends TestLogger { - private final static int NUM_TASKS = 31; + private static final int NUM_TASKS = 31; private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); @@ -113,9 +122
[jira] [Commented] (FLINK-9788) ExecutionGraph Inconsistency prevents Job from recovering
[ https://issues.apache.org/jira/browse/FLINK-9788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646551#comment-16646551 ] ASF GitHub Bot commented on FLINK-9788: --- tillrohrmann closed pull request #6810: [FLINK-9788] Fix ExecutionGraph inconsistency for global failures when restarting URL: https://github.com/apache/flink/pull/6810 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index acb1e16fe71..0be1ff27420 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) { current == JobStatus.SUSPENDED || current.isGloballyTerminalState()) { return; - } - else if (current == JobStatus.RESTARTING) { - // we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something - // has gone wrong in 'RESTARTING' and we need to re-attempt the restarts - initFailureCause(t); - - final long globalVersionForRestart = incrementGlobalModVersion(); - if (tryRestartOrFail(globalVersionForRestart)) { - return; - } - } - else if (transitionState(current, JobStatus.FAILING, t)) { + } else if (transitionState(current, JobStatus.FAILING, t)) { initFailureCause(t); // make sure no concurrent local or global actions interfere with the failover @@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) { colGroups.add(cgroup); } - jv.resetForNewExecution(resetTimestamp, globalModVersion); + jv.resetForNewExecution(resetTimestamp, expectedGlobalVersion); } for (int i = 0; i < stateTimestamps.length; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 9b98de78143..91510d1af54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -57,12 +58,15 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Test; +import javax.annotation.Nonnull; + import java.io.IOException; import java.net.InetAddress; import java.util.Iterator; @@ -86,15 +90,20 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.spy; +/** + * Tests the restart behaviour of the {@link ExecutionGraph}. + */ public class ExecutionGra