[jira] [Commented] (FLINK-6392) Change the alias of Window from optional to essential.
[ https://issues.apache.org/jira/browse/FLINK-6392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986023#comment-15986023 ] ASF GitHub Bot commented on FLINK-6392: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3786 [FLINK-6392][table] Change the alias of Window from optional to essen… In this PR refactoring the API definition using TYPE SYSTEM lead to constraint for the alias. - [x] General - The pull request references the related JIRA issue ("[FLINK-6392][table] Change the alias of Window from optional to essential.") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6392-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3786.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3786 commit a9a2c0e8473da287cb5f77ba48ed9d5048e6a3a7 Author: sunjincheng121Date: 2017-04-27T01:58:18Z [FLINK-6392][table] Change the alias of Window from optional to essential. > Change the alias of Window from optional to essential. > -- > > Key: FLINK-6392 > URL: https://issues.apache.org/jira/browse/FLINK-6392 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > Currently, The window clause use case looks like as following: > {code} > tab //Table('a,'b,'c) >.window( Slide over 10.milli every 5.milli as 'w) >.groupBy('w,'a,'b) >.select('a, 'b, 'c.sum, 'w.start, 'w.end) > {code} > As we see the alias of window is essential. But the current implementation of > the TableAPI does not have the constraint for the alias,So we must > refactoring the API definition using TYPE SYSTEM lead to constraint for the > alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3786: [FLINK-6392][table] Change the alias of Window fro...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3786 [FLINK-6392][table] Change the alias of Window from optional to essen⦠In this PR refactoring the API definition using TYPE SYSTEM lead to constraint for the alias. - [x] General - The pull request references the related JIRA issue ("[FLINK-6392][table] Change the alias of Window from optional to essential.") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6392-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3786.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3786 commit a9a2c0e8473da287cb5f77ba48ed9d5048e6a3a7 Author: sunjincheng121Date: 2017-04-27T01:58:18Z [FLINK-6392][table] Change the alias of Window from optional to essential. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6387) Flink UI support access log
[ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985992#comment-15985992 ] ASF GitHub Bot commented on FLINK-6387: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3777 > I can see this flooding the logs like crazy, especially with things like metrics and watermarks that update often. Some kind of filtering is probably necessary here. You are right. The crazy log can separated into a new log file. > This seems to be about auditing, so completely different... The main purpose is record every access, like auditing log. > Flink UI support access log > --- > > Key: FLINK-6387 > URL: https://issues.apache.org/jira/browse/FLINK-6387 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3777 > I can see this flooding the logs like crazy, especially with things like metrics and watermarks that update often. Some kind of filtering is probably necessary here. You are right. The crazy log can separated into a new log file. > This seems to be about auditing, so completely different... The main purpose is record every access, like auditing log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985934#comment-15985934 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @gyfora For the method `int partition(T next, byte[] serializedKey, byte[] serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition num of target topic can be used. But the KafkaPartitioner's partition id array has been initialized in `void open(int parallelInstanceId, int parallelInstances, int[] partitions)`, which will be executed once, so yes, the problem for dynamic new topics when user uses older KafkaPartitioner API in their older job will still exist, and I find it hard to solve this problem completely. What do you think of this? @tzulitai > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...
Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @gyfora For the method `int partition(T next, byte[] serializedKey, byte[] serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition num of target topic can be used. But the KafkaPartitioner's partition id array has been initialized in `void open(int parallelInstanceId, int parallelInstances, int[] partitions)`, which will be executed once, so yes, the problem for dynamic new topics when user uses older KafkaPartitioner API in their older job will still exist, and I find it hard to solve this problem completely. What do you think of this? @tzulitai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...
GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/3785 [FLINK-6337][network] Remove the buffer provider from PartitionRequestServerHandler Currently, `PartitionRequestServerHandler` will create a `LocalBufferPool` when the channel is registered. The `LocalBufferPool` is only used to get segment size for creating read view in `SpillableSubpartition`, and the buffers in the pool will not be used all the time, so it will waste the buffer resource of global pool. We would like to remove the `LocalBufferPool` from the `PartitionRequestServerHandler`, and the `LocalBufferPool` in `ResultPartition` can also provide the segment size for creating sub partition view. This modification will not effect the current behavior and will get benefits of saving buffer resources. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-6337 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3785.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3785 commit 306f32414eabce5522ec2a8883aa27285e6aa3e4 Author: ZhijiangDate: 2017-04-26T08:18:54Z [FLINK-6337][network] Remove the buffer provider from PartitionRequestServerHandler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6337) Remove the buffer provider from PartitionRequestServerHandler
[ https://issues.apache.org/jira/browse/FLINK-6337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985901#comment-15985901 ] ASF GitHub Bot commented on FLINK-6337: --- GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/3785 [FLINK-6337][network] Remove the buffer provider from PartitionRequestServerHandler Currently, `PartitionRequestServerHandler` will create a `LocalBufferPool` when the channel is registered. The `LocalBufferPool` is only used to get segment size for creating read view in `SpillableSubpartition`, and the buffers in the pool will not be used all the time, so it will waste the buffer resource of global pool. We would like to remove the `LocalBufferPool` from the `PartitionRequestServerHandler`, and the `LocalBufferPool` in `ResultPartition` can also provide the segment size for creating sub partition view. This modification will not effect the current behavior and will get benefits of saving buffer resources. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-6337 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3785.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3785 commit 306f32414eabce5522ec2a8883aa27285e6aa3e4 Author: ZhijiangDate: 2017-04-26T08:18:54Z [FLINK-6337][network] Remove the buffer provider from PartitionRequestServerHandler > Remove the buffer provider from PartitionRequestServerHandler > - > > Key: FLINK-6337 > URL: https://issues.apache.org/jira/browse/FLINK-6337 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently, {{PartitionRequestServerHandler}} will create a > {{LocalBufferPool}} when the channel is registered. The {{LocalBufferPool}} > is only used to get segment size for creating read view in > {{SpillableSubpartition}}, and the buffers in the pool will not be used all > the time, so it will waste the buffer resource of global pool. > We would like to remove the {{LocalBufferPool}} from the > {{PartitionRequestServerHandler}}, and the {{LocalBufferPool}} in > {{ResultPartition}} can also provide the segment size for creating sub > partition view. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6247) Build a jar-with-dependencies for flink-table and put it into ./opt
[ https://issues.apache.org/jira/browse/FLINK-6247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985890#comment-15985890 ] ASF GitHub Bot commented on FLINK-6247: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3666 Hi @fhueske Thanks for your Reviewing. And sorry for late reply. I have updated the PR. Please take a look. Thanks a lot. Thanks, SunJincheng > Build a jar-with-dependencies for flink-table and put it into ./opt > --- > > Key: FLINK-6247 > URL: https://issues.apache.org/jira/browse/FLINK-6247 > Project: Flink > Issue Type: Improvement > Components: Build System, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Due to a problem with Calcite and the unloading of classes, user-code > classloaders that include Calcite cannot be garbage collected. This is a > problem for long-running clusters that execute multiple Table API / SQL > programs with fat JARs that include the flink-table dependency. Each executed > program comes with an own user-code classloader that cannot be cleaned up > later. > As a workaround, we recommend to copy the flink-table dependency into the > ./lib folder. However, we do not have a jar file with all required transitive > dependencies (Calcite, Janino, etc). Hence, users would need to build this > jar file themselves or copy all jars into ./lib. > This issue is about creating a jar-with-dependencies and adding it to the > ./opt folder. Users can then copy the jar file from ./opt to ./lib to include > the table API in the classpath of Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3666: [FLINK-6247][table] Build a jar-with-dependencies for fli...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3666 Hi @fhueske Thanks for your Reviewing. And sorry for late reply. I have updated the PR. Please take a look. Thanks a lot. Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6392) Change the alias of Window from optional to essential.
[ https://issues.apache.org/jira/browse/FLINK-6392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6392: --- Description: Currently, The window clause use case looks like as following: {code} tab //Table('a,'b,'c) .window( Slide over 10.milli every 5.milli as 'w) .groupBy('w,'a,'b) .select('a, 'b, 'c.sum, 'w.start, 'w.end) {code} As we see the alias of window is essential. But the current implementation of the TableAPI does not have the constraint for the alias,So we must refactoring the API definition using TYPE SYSTEM lead to constraint for the alias. was: Currently, The window clause use case looks like as following: {code} tab //Table('a,'b,'c) .window( Slide over 10.milli every 5.milli as 'w) .groupBy('w,'a,'b) // WindowGroupedTable .select('a, 'b, 'c.sum, 'w.start, 'w.end) {code} As we see the alias of window is essential. But the current implementation of the TableAPI does not have the constraint for the alias,So we must refactoring the API definition using TYPE SYSTEM lead to constraint for the alias. > Change the alias of Window from optional to essential. > -- > > Key: FLINK-6392 > URL: https://issues.apache.org/jira/browse/FLINK-6392 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > Currently, The window clause use case looks like as following: > {code} > tab //Table('a,'b,'c) >.window( Slide over 10.milli every 5.milli as 'w) >.groupBy('w,'a,'b) >.select('a, 'b, 'c.sum, 'w.start, 'w.end) > {code} > As we see the alias of window is essential. But the current implementation of > the TableAPI does not have the constraint for the alias,So we must > refactoring the API definition using TYPE SYSTEM lead to constraint for the > alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6392) Change the alias of Window from optional to essential.
[ https://issues.apache.org/jira/browse/FLINK-6392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6392: --- Description: Currently, The window clause use case looks like as following: {code} tab //Table('a,'b,'c) .window( Slide over 10.milli every 5.milli as 'w) .groupBy('w,'a,'b) // WindowGroupedTable .select('a, 'b, 'c.sum, 'w.start, 'w.end) {code} As we see the alias of window is essential. But the current implementation of the TableAPI does not have the constraint for the alias,So we must refactoring the API definition using TYPE SYSTEM lead to constraint for the alias. was: Currently, The window clause use case looks like: {code} tab //Table('a,'b,'c) .window( Slide over 10.milli every 5.milli as 'w) .groupBy('w,'a,'b) // WindowGroupedTable .select('a, 'b, 'c.sum, 'w.start, 'w.end) {code} As we see the alias of window is essential. But the current implementation of the TableAPI does not have the constraint for the alias,So we must refactoring the API definition using TYPE SYSTEM lead to constraint for the alias. > Change the alias of Window from optional to essential. > -- > > Key: FLINK-6392 > URL: https://issues.apache.org/jira/browse/FLINK-6392 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > Currently, The window clause use case looks like as following: > {code} > tab //Table('a,'b,'c) >.window( Slide over 10.milli every 5.milli as 'w) >.groupBy('w,'a,'b) // WindowGroupedTable >.select('a, 'b, 'c.sum, 'w.start, 'w.end) > {code} > As we see the alias of window is essential. But the current implementation of > the TableAPI does not have the constraint for the alias,So we must > refactoring the API definition using TYPE SYSTEM lead to constraint for the > alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6392) Change the alias of Window from optional to essential.
[ https://issues.apache.org/jira/browse/FLINK-6392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6392: --- Issue Type: Sub-task (was: Improvement) Parent: FLINK-4557 > Change the alias of Window from optional to essential. > -- > > Key: FLINK-6392 > URL: https://issues.apache.org/jira/browse/FLINK-6392 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > Currently, The window clause use case looks like: > {code} > tab //Table('a,'b,'c) >.window( Slide over 10.milli every 5.milli as 'w) >.groupBy('w,'a,'b) // WindowGroupedTable >.select('a, 'b, 'c.sum, 'w.start, 'w.end) > {code} > As we see the alias of window is essential. But the current implementation of > the TableAPI does not have the constraint for the alias,So we must > refactoring the API definition using TYPE SYSTEM lead to constraint for the > alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6392) Change the alias of Window from optional to essential.
sunjincheng created FLINK-6392: -- Summary: Change the alias of Window from optional to essential. Key: FLINK-6392 URL: https://issues.apache.org/jira/browse/FLINK-6392 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.3.0 Reporter: sunjincheng Assignee: sunjincheng Fix For: 1.3.0 Currently, The window clause use case looks like: {code} tab //Table('a,'b,'c) .window( Slide over 10.milli every 5.milli as 'w) .groupBy('w,'a,'b) // WindowGroupedTable .select('a, 'b, 'c.sum, 'w.start, 'w.end) {code} As we see the alias of window is essential. But the current implementation of the TableAPI does not have the constraint for the alias,So we must refactoring the API definition using TYPE SYSTEM lead to constraint for the alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6391) fix build for scala 2.11 (gelly-examples)
[ https://issues.apache.org/jira/browse/FLINK-6391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985714#comment-15985714 ] ASF GitHub Bot commented on FLINK-6391: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3784 @greghogan that might be a good idea but I don't know much about it. I'm merely focused on fixing the build break. > fix build for scala 2.11 (gelly-examples) > - > > Key: FLINK-6391 > URL: https://issues.apache.org/jira/browse/FLINK-6391 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Eron Wright >Assignee: Eron Wright > > After switching the build to Scala 2.11 (using > `tools/change-scala-version.sh`), the build fails in flink-dist module. > {code} > ... > [INFO] flink-dist . FAILURE [ 19.337 > s] > [INFO] flink-fs-tests . SKIPPED > [INFO] > > [INFO] BUILD FAILURE > [INFO] > > [INFO] Total time: 31:16 min > [INFO] Finished at: 2017-04-26T15:17:43-07:00 > [INFO] Final Memory: 380M/1172M > [INFO] > > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project > flink-dist_2.11: Failed to create assembly: Error adding file to archive: > /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar > -> [Help 1] > {code} > The root cause appears to be that the change-scala-version tool should update > flink-dist/.../assemblies/bin.xml to use the correct version of > flink-gelly-examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3784: FLINK-6391
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3784 @greghogan that might be a good idea but I don't know much about it. I'm merely focused on fixing the build break. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6360) Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh not changing flink-gelly-examples_2.10
[ https://issues.apache.org/jira/browse/FLINK-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985711#comment-15985711 ] Greg Hogan commented on FLINK-6360: --- [~jlaskowski], have you had a chance to work on this bug? It's impacting multiple developers and next Monday's code freeze necessitates building a release candidate. > Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh > not changing flink-gelly-examples_2.10 > > > Key: FLINK-6360 > URL: https://issues.apache.org/jira/browse/FLINK-6360 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.0 >Reporter: Jacek Laskowski >Priority: Minor > > I'm building Flink for Scala 2.11 using the following command: > {code} > oss; cd flink; gco -- .; gl && \ > ./tools/change-scala-version.sh 2.11 && \ > mvn clean install -DskipTests -Dhadoop.version=2.7.3 -Dscala.version=2.11.7 > {code} > For the past couple of days I've been unable to build Flink because of the > following error: > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project > flink-dist_2.11: Failed to create assembly: Error adding file to archive: > /Users/jacek/dev/oss/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar > -> [Help 1] > {code} > I've been able to trace it down to {{flink-dist/src/main/assemblies/bin.xml}} > not being changed by {{./tools/change-scala-version.sh}} so the build tries > to include {{flink-gelly-examples_2.10}} rather than > {{flink-gelly-examples_2.11}}. > Please comment if my reasoning is correct or not and I'd be happy to work on > it. Thanks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6391) fix build for scala 2.11 (gelly-examples)
[ https://issues.apache.org/jira/browse/FLINK-6391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985710#comment-15985710 ] ASF GitHub Bot commented on FLINK-6391: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3784 Any ideas why we can't just reference `scala.binary.version` in the Maven configurations rather than specifying `2.10` and modifying this in the change script? I don't see why `change-scala-version.sh` is any more complicated than `change-version.sh`. > fix build for scala 2.11 (gelly-examples) > - > > Key: FLINK-6391 > URL: https://issues.apache.org/jira/browse/FLINK-6391 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Eron Wright >Assignee: Eron Wright > > After switching the build to Scala 2.11 (using > `tools/change-scala-version.sh`), the build fails in flink-dist module. > {code} > ... > [INFO] flink-dist . FAILURE [ 19.337 > s] > [INFO] flink-fs-tests . SKIPPED > [INFO] > > [INFO] BUILD FAILURE > [INFO] > > [INFO] Total time: 31:16 min > [INFO] Finished at: 2017-04-26T15:17:43-07:00 > [INFO] Final Memory: 380M/1172M > [INFO] > > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project > flink-dist_2.11: Failed to create assembly: Error adding file to archive: > /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar > -> [Help 1] > {code} > The root cause appears to be that the change-scala-version tool should update > flink-dist/.../assemblies/bin.xml to use the correct version of > flink-gelly-examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3784: FLINK-6391
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3784 Any ideas why we can't just reference `scala.binary.version` in the Maven configurations rather than specifying `2.10` and modifying this in the change script? I don't see why `change-scala-version.sh` is any more complicated than `change-version.sh`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user heytitle commented on the issue: https://github.com/apache/flink/pull/3511 I also think about the abstract class but I'm not sure how to do it properly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6360) Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh not changing flink-gelly-examples_2.10
[ https://issues.apache.org/jira/browse/FLINK-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985700#comment-15985700 ] Eron Wright commented on FLINK-6360: - I fixed this and opened a PR at the same time as Greg marked it as a dup. Anyway here's the PR: https://github.com/apache/flink/pull/3784 > Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh > not changing flink-gelly-examples_2.10 > > > Key: FLINK-6360 > URL: https://issues.apache.org/jira/browse/FLINK-6360 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.0 >Reporter: Jacek Laskowski >Priority: Minor > > I'm building Flink for Scala 2.11 using the following command: > {code} > oss; cd flink; gco -- .; gl && \ > ./tools/change-scala-version.sh 2.11 && \ > mvn clean install -DskipTests -Dhadoop.version=2.7.3 -Dscala.version=2.11.7 > {code} > For the past couple of days I've been unable to build Flink because of the > following error: > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project > flink-dist_2.11: Failed to create assembly: Error adding file to archive: > /Users/jacek/dev/oss/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar > -> [Help 1] > {code} > I've been able to trace it down to {{flink-dist/src/main/assemblies/bin.xml}} > not being changed by {{./tools/change-scala-version.sh}} so the build tries > to include {{flink-gelly-examples_2.10}} rather than > {{flink-gelly-examples_2.11}}. > Please comment if my reasoning is correct or not and I'd be happy to work on > it. Thanks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6360) Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh not changing flink-gelly-examples_2.10
[ https://issues.apache.org/jira/browse/FLINK-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985700#comment-15985700 ] Eron Wright edited comment on FLINK-6360 at 4/26/17 10:55 PM: --- I fixed this and opened a PR at the same time as Greg marked FLINK-6391 as a dup. Anyway here's the PR: https://github.com/apache/flink/pull/3784 was (Author: eronwright): I fixed this and opened a PR at the same time as Greg marked it as a dup. Anyway here's the PR: https://github.com/apache/flink/pull/3784 > Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh > not changing flink-gelly-examples_2.10 > > > Key: FLINK-6360 > URL: https://issues.apache.org/jira/browse/FLINK-6360 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.0 >Reporter: Jacek Laskowski >Priority: Minor > > I'm building Flink for Scala 2.11 using the following command: > {code} > oss; cd flink; gco -- .; gl && \ > ./tools/change-scala-version.sh 2.11 && \ > mvn clean install -DskipTests -Dhadoop.version=2.7.3 -Dscala.version=2.11.7 > {code} > For the past couple of days I've been unable to build Flink because of the > following error: > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project > flink-dist_2.11: Failed to create assembly: Error adding file to archive: > /Users/jacek/dev/oss/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar > -> [Help 1] > {code} > I've been able to trace it down to {{flink-dist/src/main/assemblies/bin.xml}} > not being changed by {{./tools/change-scala-version.sh}} so the build tries > to include {{flink-gelly-examples_2.10}} rather than > {{flink-gelly-examples_2.11}}. > Please comment if my reasoning is correct or not and I'd be happy to work on > it. Thanks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6391) fix build for scala 2.11 (gelly-examples)
[ https://issues.apache.org/jira/browse/FLINK-6391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985699#comment-15985699 ] ASF GitHub Bot commented on FLINK-6391: --- GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/3784 FLINK-6391 FLINK-6391 Improve change-scala-version.sh to correctly deal with references to flink-gelly-examples. You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink FLINK-6391 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3784.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3784 commit 0985697f071133abf0a512a4a79834a1ccd19cde Author: Wright, EronDate: 2017-04-26T22:48:57Z FLINK-6391 Improve change-scala-version.sh to correctly deal with references to flink-gelly-examples. > fix build for scala 2.11 (gelly-examples) > - > > Key: FLINK-6391 > URL: https://issues.apache.org/jira/browse/FLINK-6391 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Eron Wright >Assignee: Eron Wright > > After switching the build to Scala 2.11 (using > `tools/change-scala-version.sh`), the build fails in flink-dist module. > {code} > ... > [INFO] flink-dist . FAILURE [ 19.337 > s] > [INFO] flink-fs-tests . SKIPPED > [INFO] > > [INFO] BUILD FAILURE > [INFO] > > [INFO] Total time: 31:16 min > [INFO] Finished at: 2017-04-26T15:17:43-07:00 > [INFO] Final Memory: 380M/1172M > [INFO] > > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project > flink-dist_2.11: Failed to create assembly: Error adding file to archive: > /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar > -> [Help 1] > {code} > The root cause appears to be that the change-scala-version tool should update > flink-dist/.../assemblies/bin.xml to use the correct version of > flink-gelly-examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3784: FLINK-6391
GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/3784 FLINK-6391 FLINK-6391 Improve change-scala-version.sh to correctly deal with references to flink-gelly-examples. You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink FLINK-6391 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3784.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3784 commit 0985697f071133abf0a512a4a79834a1ccd19cde Author: Wright, EronDate: 2017-04-26T22:48:57Z FLINK-6391 Improve change-scala-version.sh to correctly deal with references to flink-gelly-examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-6391) fix build for scala 2.11 (gelly-examples)
[ https://issues.apache.org/jira/browse/FLINK-6391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-6391. - Resolution: Duplicate > fix build for scala 2.11 (gelly-examples) > - > > Key: FLINK-6391 > URL: https://issues.apache.org/jira/browse/FLINK-6391 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Eron Wright >Assignee: Eron Wright > > After switching the build to Scala 2.11 (using > `tools/change-scala-version.sh`), the build fails in flink-dist module. > {code} > ... > [INFO] flink-dist . FAILURE [ 19.337 > s] > [INFO] flink-fs-tests . SKIPPED > [INFO] > > [INFO] BUILD FAILURE > [INFO] > > [INFO] Total time: 31:16 min > [INFO] Finished at: 2017-04-26T15:17:43-07:00 > [INFO] Final Memory: 380M/1172M > [INFO] > > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project > flink-dist_2.11: Failed to create assembly: Error adding file to archive: > /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar > -> [Help 1] > {code} > The root cause appears to be that the change-scala-version tool should update > flink-dist/.../assemblies/bin.xml to use the correct version of > flink-gelly-examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6391) fix build for scala 2.11 (gelly-examples)
Eron Wright created FLINK-6391: --- Summary: fix build for scala 2.11 (gelly-examples) Key: FLINK-6391 URL: https://issues.apache.org/jira/browse/FLINK-6391 Project: Flink Issue Type: Bug Components: Build System Reporter: Eron Wright Assignee: Eron Wright After switching the build to Scala 2.11 (using `tools/change-scala-version.sh`), the build fails in flink-dist module. {code} ... [INFO] flink-dist . FAILURE [ 19.337 s] [INFO] flink-fs-tests . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 31:16 min [INFO] Finished at: 2017-04-26T15:17:43-07:00 [INFO] Final Memory: 380M/1172M [INFO] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project flink-dist_2.11: Failed to create assembly: Error adding file to archive: /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar -> [Help 1] {code} The root cause appears to be that the change-scala-version tool should update flink-dist/.../assemblies/bin.xml to use the correct version of flink-gelly-examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3511 @heytitle, you can `rebase -i dd20^` and then delete the first two lines which removes those commits from the history. An initial thought from skimming the code: should we create an abstract `NormalizedKeySorterBase` with common code from the generated and non-generated implementations? This way the lines of code in the template would be minimized and we wouldn't need to synchronize changes. I don't see a reason why this would decrease performance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985629#comment-15985629 ] ASF GitHub Bot commented on FLINK-3722: --- Github user heytitle commented on the issue: https://github.com/apache/flink/pull/3511 @greghogan May I ask you how to remove `FLINK-3722` commits?. Only way I can think of is `git rebase -i`, but this will rewrite history of this PR. > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > Fix For: 1.3.0 > > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user heytitle commented on the issue: https://github.com/apache/flink/pull/3511 @greghogan May I ask you how to remove `FLINK-3722` commits?. Only way I can think of is `git rebase -i`, but this will rewrite history of this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-6390. --- > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its completion by completing the > future. For hooks that do not >* need to store data, the future would be completed with null. >* >* @param checkpointId The ID (logical timestamp, monotonously > increasing) of the checkpoint >* @param timestamp The wall clock timestamp when the checkpoint was > triggered, for >* info/logging purposes. >
[jira] [Resolved] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-6390. - Resolution: Fixed Implemented in 90ca438106e63c5032ee2ad27e54e9f573eac386 > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its completion by completing the > future. For hooks that do not >* need to store data, the future would be completed with null. >* >* @param checkpointId The ID (logical timestamp, monotonously > increasing) of the checkpoint >* @param timestamp The wall clock timestamp when the
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985621#comment-15985621 ] ASF GitHub Bot commented on FLINK-6390: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3782 > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its completion by completing the > future. For hooks that do not >* need to store data, the future would be completed with null. >* >* @param checkpointId The ID (logical timestamp, monotonously > increasing) of the checkpoint >*
[GitHub] flink pull request #3782: [FLINK-6390] [checkpoints] Add API for checkpoints...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3782 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4370) Offer a default IntelliJ inspection profile with Flink
[ https://issues.apache.org/jira/browse/FLINK-4370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985606#comment-15985606 ] ASF GitHub Bot commented on FLINK-4370: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2352 @StephanEwen can these be shared under `tools/idea/...`? If they are installed by default then any user modifications will be noted by git. Conversely, most contributors won't notice these files or synchronize as additional inspections are enabled, but as of now developers are on their own. > Offer a default IntelliJ inspection profile with Flink > -- > > Key: FLINK-4370 > URL: https://issues.apache.org/jira/browse/FLINK-4370 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Stephan Ewen > > We can commit an inspection profile under {{.idea/inspectionProfiles}} which > should be automatically picked up when the code is checked out and imported > into IntelliJ -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2352: [FLINK-4370] Add an IntelliJ Inspections Profile
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2352 @StephanEwen can these be shared under `tools/idea/...`? If they are installed by default then any user modifications will be noted by git. Conversely, most contributors won't notice these files or synchronize as additional inspections are enabled, but as of now developers are on their own. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985578#comment-15985578 ] Greg Hogan commented on FLINK-6107: --- [~StephanEwen], I think there was a perception that import order was not as consistent throughout the project. Previously there didn't seem to be strong opinions either way. I suspect the import order has drifted out-of-sync from your original work because this has not been enforced. > Add custom checkstyle for flink-streaming-java > -- > > Key: FLINK-6107 > URL: https://issues.apache.org/jira/browse/FLINK-6107 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.3.0 > > > There was some consensus on the ML > (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E) > that we want to have a more uniform code style. We should start > module-by-module and by introducing increasingly stricter rules. We have to > be aware of the PR situation and ensure that we have minimal breakage for > contributors. > This issue aims at adding a custom checkstyle.xml for > {{flink-streaming-java}} that is based on our current checkstyle.xml but adds > these checks for Javadocs: > {code} > > > > > > > > > > > > > > > > > > > > > > > > > {code} > This checks: > - Every type has a type-level Javadoc > - Proper use of {{}} in Javadocs > - First sentence must end with a proper punctuation mark > - Proper use (including closing) of HTML tags -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6281) Create TableSink for JDBC
[ https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985498#comment-15985498 ] ASF GitHub Bot commented on FLINK-6281: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 Thanks for your pointer of the prototype! > Do you intend to provide exactly-once guarantees for arbitrary updates? As I think about it a little bit more, I think it might make sense to start with the at-least-once semantic first. In practice we make the JDBC call idempotent using `INSERT IF NOT EXISTS`. The exactly-once part is more tricky and let's separate it out for now. What do you think? > Create TableSink for JDBC > - > > Key: FLINK-6281 > URL: https://issues.apache.org/jira/browse/FLINK-6281 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be nice to integrate the table APIs with the JDBC connectors so that > the rows in the tables can be directly pushed into JDBC. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 Thanks for your pointer of the prototype! > Do you intend to provide exactly-once guarantees for arbitrary updates? As I think about it a little bit more, I think it might make sense to start with the at-least-once semantic first. In practice we make the JDBC call idempotent using `INSERT IF NOT EXISTS`. The exactly-once part is more tricky and let's separate it out for now. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6293) Flakey JobManagerITCase
[ https://issues.apache.org/jira/browse/FLINK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985432#comment-15985432 ] Stephan Ewen commented on FLINK-6293: - Hitting this frequently on local builds as well: {code} Tests run: 21, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1,220.166 sec <<< FAILURE! - in org.apache.flink.runtime.jobmanager.JobManagerITCase The JobManager actor must handle trigger savepoint response for non-existing job(org.apache.flink.runtime.jobmanager.JobManagerITCase) Time elapsed: 1,199.316 sec <<< FAILURE! java.lang.AssertionError: assertion failed: timeout (1199213200030 nanoseconds) during expectMsgClass waiting for class org.apache.flink.runtime.messages.JobManagerMessages$TriggerSavepointFailure at scala.Predef$.assert(Predef.scala:179) at akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423) at akka.testkit.TestKitBase$class.expectMsgType(TestKit.scala:405) at akka.testkit.TestKit.expectMsgType(TestKit.scala:718) at org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34$$anonfun$apply$mcV$sp$35.apply$mcV$sp(JobManagerITCase.scala:772) at org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34$$anonfun$apply$mcV$sp$35.apply(JobManagerITCase.scala:764) at org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34$$anonfun$apply$mcV$sp$35.apply(JobManagerITCase.scala:764) at akka.testkit.TestKitBase$class.within(TestKit.scala:296) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.TestKitBase$class.within(TestKit.scala:310) at akka.testkit.TestKit.within(TestKit.scala:718) at org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34.apply$mcV$sp(JobManagerITCase.scala:764) at org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34.apply(JobManagerITCase.scala:758) at org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34.apply(JobManagerITCase.scala:758) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.apache.flink.runtime.jobmanager.JobManagerITCase.withFixture(JobManagerITCase.scala:50) {code} > Flakey JobManagerITCase > --- > > Key: FLINK-6293 > URL: https://issues.apache.org/jira/browse/FLINK-6293 > Project: Flink > Issue Type: Bug > Components: JobManager, Tests >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > > Quite seldomly, {{JobManagerITCase}} seems to hang, e.g. see > https://api.travis-ci.org/jobs/220888193/log.txt?deansi=true > The maven watchdog kills the build due to not output being produced within > 300s and {{JobManagerITCase}} seems to hang in line 772, i.e. > {code:title=JobManagerITCase lines > 770-772|language=java|linenumbers=true|firstline=770} > // Trigger savepoint for non-existing job > jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), testActor) > val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft) > {code} > Although the (downloaded) logs do not quite allow a precise mapping to this > test case, it looks as if the following block may be related: > {code} > 09:34:47,684 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster >- Akka ask timeout set to 100s > 09:34:47,777 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster >- Disabled queryable state server > 09:34:47,777 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster >- Starting FlinkMiniCluster. > 09:34:47,809 INFO akka.event.slf4j.Slf4jLogger >- Slf4jLogger started > 09:34:47,837 INFO org.apache.flink.runtime.blob.BlobServer >- Created BLOB server storage directory > /tmp/blobStore-eab23d04-ea18-4dc5-b1df-fcf9fc295062 > 09:34:47,838 WARN org.apache.flink.runtime.net.SSLUtils >- Not a SSL socket, will skip setting tls version and cipher suites. > 09:34:47,839 INFO org.apache.flink.runtime.blob.BlobServer >- Started BLOB server at 0.0.0.0:36745 - max concurrent requests: 50 - max > backlog: 1000 > 09:34:47,840 INFO
[jira] [Updated] (FLINK-6293) Flakey JobManagerITCase
[ https://issues.apache.org/jira/browse/FLINK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-6293: Priority: Critical (was: Major) > Flakey JobManagerITCase > --- > > Key: FLINK-6293 > URL: https://issues.apache.org/jira/browse/FLINK-6293 > Project: Flink > Issue Type: Bug > Components: JobManager, Tests >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > > Quite seldomly, {{JobManagerITCase}} seems to hang, e.g. see > https://api.travis-ci.org/jobs/220888193/log.txt?deansi=true > The maven watchdog kills the build due to not output being produced within > 300s and {{JobManagerITCase}} seems to hang in line 772, i.e. > {code:title=JobManagerITCase lines > 770-772|language=java|linenumbers=true|firstline=770} > // Trigger savepoint for non-existing job > jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), testActor) > val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft) > {code} > Although the (downloaded) logs do not quite allow a precise mapping to this > test case, it looks as if the following block may be related: > {code} > 09:34:47,684 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster >- Akka ask timeout set to 100s > 09:34:47,777 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster >- Disabled queryable state server > 09:34:47,777 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster >- Starting FlinkMiniCluster. > 09:34:47,809 INFO akka.event.slf4j.Slf4jLogger >- Slf4jLogger started > 09:34:47,837 INFO org.apache.flink.runtime.blob.BlobServer >- Created BLOB server storage directory > /tmp/blobStore-eab23d04-ea18-4dc5-b1df-fcf9fc295062 > 09:34:47,838 WARN org.apache.flink.runtime.net.SSLUtils >- Not a SSL socket, will skip setting tls version and cipher suites. > 09:34:47,839 INFO org.apache.flink.runtime.blob.BlobServer >- Started BLOB server at 0.0.0.0:36745 - max concurrent requests: 50 - max > backlog: 1000 > 09:34:47,840 INFO org.apache.flink.runtime.metrics.MetricRegistry >- No metrics reporter configured, no metrics will be exposed/reported. > 09:34:47,850 INFO > org.apache.flink.runtime.testingUtils.TestingMemoryArchivist - Started > memory archivist akka://flink/user/archive_1 > 09:34:47,860 INFO org.apache.flink.runtime.testutils.TestingResourceManager >- Trying to associate with JobManager leader akka://flink/user/jobmanager_1 > 09:34:47,861 INFO org.apache.flink.runtime.testingUtils.TestingJobManager >- Starting JobManager at akka://flink/user/jobmanager_1. > 09:34:47,862 WARN org.apache.flink.runtime.testingUtils.TestingJobManager >- Discard message > LeaderSessionMessage(----,TriggerSavepoint(6e813070338a23b0ff571646bca56521,Some(any))) > because there is currently no valid leader id known. > 09:34:47,862 INFO org.apache.flink.runtime.testingUtils.TestingJobManager >- JobManager akka://flink/user/jobmanager_1 was granted leadership with > leader session ID Some(----). > 09:34:47,867 INFO org.apache.flink.runtime.testutils.TestingResourceManager >- Resource Manager associating with leading JobManager > Actor[akka://flink/user/jobmanager_1#-652927556] - leader session > ---- > {code} > If so, then this may be related to FLINK-6287 and may possibly even be a > duplicate. > What is strange though is that the timeout for the expected message to arrive > is no more than 2m and thus the test should properly fail within 300s. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985423#comment-15985423 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113541703 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); --- End diff -- I see, In that case i would suggest to import the Flink for type and use the qualified name for the Datastax row. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113541703 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); --- End diff -- I see, In that case i would suggest to import the Flink for type and use the qualified name for the Datastax row. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6287) Flakey JobManagerRegistrationTest
[ https://issues.apache.org/jira/browse/FLINK-6287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985404#comment-15985404 ] Stephan Ewen commented on FLINK-6287: - A recent PR has addressed some instability, but I am still seeing failures quite frequently on a local build: {code} org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest The JobManager should handle repeated registration calls(org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest) Time elapsed: 3.103 sec <<< FAILURE! java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsgClass waiting for class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage at scala.Predef$.assert(Predef.scala:179) at akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423) at akka.testkit.TestKitBase$class.expectMsgType(TestKit.scala:396) at akka.testkit.TestKit.expectMsgType(TestKit.scala:718) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcV$sp(JobManagerRegistrationTest.scala:188) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(JobManagerRegistrationTest.scala:163) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(JobManagerRegistrationTest.scala:163) at akka.testkit.TestKitBase$class.within(TestKit.scala:296) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.TestKitBase$class.within(TestKit.scala:310) at akka.testkit.TestKit.within(TestKit.scala:718) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerRegistrationTest.scala:163) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:142) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:142) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.withFixture(JobManagerRegistrationTest.scala:51) {code} > Flakey JobManagerRegistrationTest > - > > Key: FLINK-6287 > URL: https://issues.apache.org/jira/browse/FLINK-6287 > Project: Flink > Issue Type: Bug > Components: JobManager, Tests >Affects Versions: 1.3.0 > Environment: unit tests >Reporter: Nico Kruber > Labels: test-stability > > There seems to be a race condition in the "{{JobManagerRegistrationTest.The > JobManager should handle repeated registration calls}}" (scala) unit test. > Every so often, especially when my system is under load, this test fails with > a timeout after seeing the following messages in the log4j INFO outputs: > {code} > 14:18:42,257 INFO org.apache.flink.runtime.testutils.TestingResourceManager - > Trying to associate with JobManager leader akka://flink/user/$f#-1062324203 > 14:18:42,253 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting > JobManager at akka://flink/user/$f. > 14:18:42,258 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard > message > LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c > @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, > heap=1556938752, managed=10,1)) because there is currently no valid leader id > known. > 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard > message > LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c > @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, > heap=1556938752, managed=10,1)) because there is currently no valid leader id > known. > 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard > message > LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c > @ nico-work.fritz.box
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985378#comment-15985378 ] ASF GitHub Bot commented on FLINK-5969: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113446953 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointFrom12MigrationITCase.java --- @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This verifies that we can restore a complete job from a Flink 1.2 savepoint. + * + * The test pipeline contains both "Checkpointed" state and keyed user state. + * + * The tests will time out if they don't see the required number of successful checks within + * a time limit. + */ +public class StatefulUDFSavepointFrom12MigrationITCase extends SavepointMigrationTestBase { + private static final int NUM_SOURCE_ELEMENTS = 4; + + /** +* This has to be manually executed to create the savepoint on Flink 1.2. +*/ + @Test + @Ignore + public void testCreateSavepointOnFlink12() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new MemoryStateBackend()); + env.enableCheckpointing(500); + env.setParallelism(4); +
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985379#comment-15985379 ] ASF GitHub Bot commented on FLINK-5969: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113441899 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java --- @@ -155,12 +155,14 @@ public void snapshotState(FSDataOutputStream out, long checkpointId, long timest @Override public void restoreState(FSDataInputStream in) throws Exception { + boolean readFlag = false; if (userFunction instanceof Checkpointed || - (userFunction instanceof CheckpointedRestoring && in instanceof Migration)) { + (userFunction instanceof CheckpointedRestoring)) { --- End diff -- remove braces around `userFunction instanceof CheckpointedRestoring`; could also move it to the previous line. > Add savepoint backwards compatibility tests from 1.2 to 1.3 > --- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985375#comment-15985375 ] ASF GitHub Bot commented on FLINK-5969: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113435588 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException } /** +* Reattaches to a running from from the supplied job id --- End diff -- typo: from -> job > Add savepoint backwards compatibility tests from 1.2 to 1.3 > --- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985381#comment-15985381 ] ASF GitHub Bot commented on FLINK-5969: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113435697 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException } /** +* Reattaches to a running from from the supplied job id +* +* @param jobID The job id of the job to attach to +* @return The JobExecutionResult for the jobID +* @throws JobExecutionException if an error occurs during monitoring the job execution +*/ + public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException { + final LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e); + } + + ActorGateway jobManagerGateway; + try { + jobManagerGateway = getJobManagerGateway(); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway"); --- End diff -- Include the exception > Add savepoint backwards compatibility tests from 1.2 to 1.3 > --- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985380#comment-15985380 ] ASF GitHub Bot commented on FLINK-5969: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113435639 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException } /** +* Reattaches to a running from from the supplied job id +* +* @param jobID The job id of the job to attach to +* @return The JobExecutionResult for the jobID +* @throws JobExecutionException if an error occurs during monitoring the job execution +*/ + public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException { + final LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e); + } + + ActorGateway jobManagerGateway; + try { + jobManagerGateway = getJobManagerGateway(); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway"); + } + + return JobClient.attachToRunningJob( + jobID, + jobManagerGateway, + flinkConfig, + actorSystemLoader.get(), + leaderRetrievalService, + timeout, + printStatusDuringExecution); + } + + --- End diff -- remove empty line > Add savepoint backwards compatibility tests from 1.2 to 1.3 > --- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985376#comment-15985376 ] ASF GitHub Bot commented on FLINK-5969: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113442453 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java --- @@ -155,12 +155,14 @@ public void snapshotState(FSDataOutputStream out, long checkpointId, long timest @Override public void restoreState(FSDataInputStream in) throws Exception { + boolean readFlag = false; --- End diff -- the variable name is a bit ambiguous, (it could also mean "you have to read the flag". > Add savepoint backwards compatibility tests from 1.2 to 1.3 > --- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985377#comment-15985377 ] ASF GitHub Bot commented on FLINK-5969: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113440360 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java --- @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hdfstests; + +import java.io.FileOutputStream; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class ContinuousFileProcessingFrom12MigrationTest { + + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 100; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + /** +* Manually run this to write binary snapshot data. Remove @Ignore to run. +*/ + @Ignore + @Test + public void writeReaderSnapshot() throws Exception { + + File testFolder = tempFolder.newFolder(); + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + final OneShotLatch latch = new OneShotLatch(); + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath())); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + ContinuousFileReaderOperator initReader = new ContinuousFileReaderOperator<>( + format); +
[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113441899 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java --- @@ -155,12 +155,14 @@ public void snapshotState(FSDataOutputStream out, long checkpointId, long timest @Override public void restoreState(FSDataInputStream in) throws Exception { + boolean readFlag = false; if (userFunction instanceof Checkpointed || - (userFunction instanceof CheckpointedRestoring && in instanceof Migration)) { + (userFunction instanceof CheckpointedRestoring)) { --- End diff -- remove braces around `userFunction instanceof CheckpointedRestoring`; could also move it to the previous line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113435639 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException } /** +* Reattaches to a running from from the supplied job id +* +* @param jobID The job id of the job to attach to +* @return The JobExecutionResult for the jobID +* @throws JobExecutionException if an error occurs during monitoring the job execution +*/ + public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException { + final LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e); + } + + ActorGateway jobManagerGateway; + try { + jobManagerGateway = getJobManagerGateway(); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway"); + } + + return JobClient.attachToRunningJob( + jobID, + jobManagerGateway, + flinkConfig, + actorSystemLoader.get(), + leaderRetrievalService, + timeout, + printStatusDuringExecution); + } + + --- End diff -- remove empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113435697 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException } /** +* Reattaches to a running from from the supplied job id +* +* @param jobID The job id of the job to attach to +* @return The JobExecutionResult for the jobID +* @throws JobExecutionException if an error occurs during monitoring the job execution +*/ + public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException { + final LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e); + } + + ActorGateway jobManagerGateway; + try { + jobManagerGateway = getJobManagerGateway(); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway"); --- End diff -- Include the exception --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113446953 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointFrom12MigrationITCase.java --- @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This verifies that we can restore a complete job from a Flink 1.2 savepoint. + * + * The test pipeline contains both "Checkpointed" state and keyed user state. + * + * The tests will time out if they don't see the required number of successful checks within + * a time limit. + */ +public class StatefulUDFSavepointFrom12MigrationITCase extends SavepointMigrationTestBase { + private static final int NUM_SOURCE_ELEMENTS = 4; + + /** +* This has to be manually executed to create the savepoint on Flink 1.2. +*/ + @Test + @Ignore + public void testCreateSavepointOnFlink12() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new MemoryStateBackend()); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new
[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113442453 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java --- @@ -155,12 +155,14 @@ public void snapshotState(FSDataOutputStream out, long checkpointId, long timest @Override public void restoreState(FSDataInputStream in) throws Exception { + boolean readFlag = false; --- End diff -- the variable name is a bit ambiguous, (it could also mean "you have to read the flag". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113435588 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException } /** +* Reattaches to a running from from the supplied job id --- End diff -- typo: from -> job --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113440360 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java --- @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hdfstests; + +import java.io.FileOutputStream; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class ContinuousFileProcessingFrom12MigrationTest { + + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 100; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + /** +* Manually run this to write binary snapshot data. Remove @Ignore to run. +*/ + @Ignore + @Test + public void writeReaderSnapshot() throws Exception { + + File testFolder = tempFolder.newFolder(); + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + final OneShotLatch latch = new OneShotLatch(); + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath())); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + ContinuousFileReaderOperator initReader = new ContinuousFileReaderOperator<>( + format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + OneInputStreamOperatorTestHarnesstestHarness = + new OneInputStreamOperatorTestHarness<>(initReader);
[jira] [Commented] (FLINK-6175) HistoryServerTest.testFullArchiveLifecycle fails
[ https://issues.apache.org/jira/browse/FLINK-6175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985286#comment-15985286 ] ASF GitHub Bot commented on FLINK-6175: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3655 While at hardening: The test seems to work with a fix port. That is bound to cause failures due to conflicts (for example 8082 is common to be used if you have a local HA Flink for testing). > HistoryServerTest.testFullArchiveLifecycle fails > > > Key: FLINK-6175 > URL: https://issues.apache.org/jira/browse/FLINK-6175 > Project: Flink > Issue Type: Test > Components: History Server, Tests, Webfrontend >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/213933823/log.txt > {code} > estFullArchiveLifecycle(org.apache.flink.runtime.webmonitor.history.HistoryServerTest) > Time elapsed: 2.162 sec <<< FAILURE! > java.lang.AssertionError: /joboverview.json did not contain valid json > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertNotNull(Assert.java:712) > at > org.apache.flink.runtime.webmonitor.history.HistoryServerTest.testFullArchiveLifecycle(HistoryServerTest.java:98) > {code} > Happened on a branch with unrelated changes [~Zentol]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5752) Support push down projections for HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985287#comment-15985287 ] ASF GitHub Bot commented on FLINK-5752: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 I checked the recent failure. `Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 20.417 sec <<< FAILURE! - in org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest` This failure seems unrelated with the changes in this patch. @tonycox , @fhueske Just a gentle ping. > Support push down projections for HBaseTableSource > -- > > Key: FLINK-5752 > URL: https://issues.apache.org/jira/browse/FLINK-5752 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > This is after the discussion to create NestedProjectableTableSource. > Currently we support nested schema for the non-relational type of DBs like > HBase. > But this does not allow push down projection. This JIRA is to implement that. > Once FLINK-5698 is implemented then we should be making use of the feature to > push down the projections for a nested table. So in case of HBase if we have > {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query > that needs to select f2.c - then we should be specifically able to project > only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such > projections and HBaseTableSource should make use of that API to do the > projection. > [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 I checked the recent failure. `Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 20.417 sec <<< FAILURE! - in org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest` This failure seems unrelated with the changes in this patch. @tonycox , @fhueske Just a gentle ping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3655: [FLINK-6175] Harden HistoryServerTest#testFullArchiveLife...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3655 While at hardening: The test seems to work with a fix port. That is bound to cause failures due to conflicts (for example 8082 is common to be used if you have a local HA Flink for testing). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends
[ https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985265#comment-15985265 ] ASF GitHub Bot commented on FLINK-6048: --- Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3536 > Asynchronous snapshots for heap-based operator state backends > - > > Key: FLINK-6048 > URL: https://issues.apache.org/jira/browse/FLINK-6048 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > The synchronous checkpointing mechanism of heap-based operator state backends > blocks element processing for the duration of the checkpoint. > We could implement an heap-based operator state backend that allows for > asynchronous checkpoints. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3536: [FLINK-6048] Asynchronous snapshots for heap-based...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3536 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113519219 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,109 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + + flink-metrics-datadog + flink-metrics-datadog + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + com.squareup.okhttp3 + okhttp + 3.6.0 + + + + com.squareup.okio + okio + 1.11.0 + + + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + jar-with-dependencies --- End diff -- yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985247#comment-15985247 ] ASF GitHub Bot commented on FLINK-6013: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113519219 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,109 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + + flink-metrics-datadog + flink-metrics-datadog + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + com.squareup.okhttp3 + okhttp + 3.6.0 + + + + com.squareup.okio + okio + 1.11.0 + + + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + jar-with-dependencies --- End diff -- yes. > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6338) SimpleStringUtils should be called StringValueUtils
[ https://issues.apache.org/jira/browse/FLINK-6338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985245#comment-15985245 ] ASF GitHub Bot commented on FLINK-6338: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3783 Please fix the PR title, you are referencing the wrong JIRA. > SimpleStringUtils should be called StringValueUtils > --- > > Key: FLINK-6338 > URL: https://issues.apache.org/jira/browse/FLINK-6338 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Trivial > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3783: [FLINK-6338] Add support for DISTINCT into Code Generated...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3783 Please fix the PR title, you are referencing the wrong JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends
[ https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985237#comment-15985237 ] ASF GitHub Bot commented on FLINK-6048: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3536 Thanks for the review @tillrohrmann. I addressed your comments concerning unregistration of streams. Will merge this now. > Asynchronous snapshots for heap-based operator state backends > - > > Key: FLINK-6048 > URL: https://issues.apache.org/jira/browse/FLINK-6048 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > The synchronous checkpointing mechanism of heap-based operator state backends > blocks element processing for the duration of the checkpoint. > We could implement an heap-based operator state backend that allows for > asynchronous checkpoints. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3536: [FLINK-6048] Asynchronous snapshots for heap-based Operat...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3536 Thanks for the review @tillrohrmann. I addressed your comments concerning unregistration of streams. Will merge this now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985221#comment-15985221 ] Stephan Ewen commented on FLINK-6107: - I would like to make a suggestion for an adjustment to the style, specifically to the import order. Basically all other files in Flink have the pattern: # Flink / library imports # java.foo.bar imports # static imports Also, most files place spaces between components that the imports are derived from. It may just be me, being an oldschool guy that looks at the imports quite a bit (almost for every new class I open, I find it useful to get an initial overview of what a class interacts with), but I find the style before was much better to get a "quick glance" impression: - Spaces between logical goups (Flink / logger / library / etc) and - Flink and libraries first (its what matters to get the overview) - java below (not really important for the overview) - static imports last (they are just syntactic sugar and not required for any understanting). So, why don't we keep that style? It would also result in fewer necessary reformatting and fewer merge conflicts. I pasted the examples below to illustrate that: h4. Original formatting of most files {code} import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkArgument; {code} h4. New Format {code} import static org.apache.flink.util.Preconditions.checkArgument; import java.io.IOException; import java.util.ArrayDeque; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; {code} > Add custom checkstyle for flink-streaming-java > -- > > Key: FLINK-6107 > URL: https://issues.apache.org/jira/browse/FLINK-6107 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.3.0 > > > There was some consensus on the ML > (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E) > that we want to have a more uniform code style. We should start > module-by-module and by introducing increasingly stricter rules. We have to > be aware of the PR situation and ensure that we have minimal breakage for > contributors. > This issue aims at adding a custom checkstyle.xml for > {{flink-streaming-java}} that is based on our current checkstyle.xml but adds > these checks for Javadocs: > {code} > > > > > > > > > > > > > > > > > > > > > > > > > {code} > This checks: > - Every type has a type-level Javadoc > - Proper use of {{}} in Javadocs > - First sentence must end with a proper punctuation mark > - Proper use (including closing) of HTML tags -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985213#comment-15985213 ] ASF GitHub Bot commented on FLINK-3722: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3511 @heytitle please remove the old FLINK-3722 commits and rebase to master. > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > Fix For: 1.3.0 > > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3511 @heytitle please remove the old FLINK-3722 commits and rebase to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985209#comment-15985209 ] ASF GitHub Bot commented on FLINK-6250: --- Github user stefanobortoli closed the pull request at: https://github.com/apache/flink/pull/3771 > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user stefanobortoli closed the pull request at: https://github.com/apache/flink/pull/3771 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6357) ParameterTool get unrequested parameters
[ https://issues.apache.org/jira/browse/FLINK-6357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-6357: -- Summary: ParameterTool get unrequested parameters (was: Parametertool get unrequested parameters) > ParameterTool get unrequested parameters > > > Key: FLINK-6357 > URL: https://issues.apache.org/jira/browse/FLINK-6357 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The Gelly examples use {{ParameterTool}} to parse required and optional > parameters. In the latter case we should detect if a user mistypes a > parameter name. I would like to add a {{Set > getUnrequestedParameters()}} method returning parameter names not requested > by {{has}} or any of the {{get}} methods. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985208#comment-15985208 ] ASF GitHub Bot commented on FLINK-6250: --- Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3771 @fhueske I have created #3783 with just the code generation part. At least the GROUP BY distinct can move ahead. I will close this PR and wait for the merging of the Calcite fix. > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3771 @fhueske I have created #3783 with just the code generation part. At least the GROUP BY distinct can move ahead. I will close this PR and wait for the merging of the Calcite fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3722: -- Fix Version/s: 1.3.0 > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > Fix For: 1.3.0 > > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-3722. - Resolution: Implemented Implemented in 336b95d4eedc23e5ce37d1739165157e127c65f8 > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > Fix For: 1.3.0 > > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985203#comment-15985203 ] ASF GitHub Bot commented on FLINK-3722: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2628 > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #2628: [FLINK-3722] [runtime] Don't / and % when sorting
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2628 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6338) SimpleStringUtils should be called StringValueUtils
[ https://issues.apache.org/jira/browse/FLINK-6338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985198#comment-15985198 ] ASF GitHub Bot commented on FLINK-6338: --- Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske please have a look at this PR, it contains just the code generation part with optional distinct. > SimpleStringUtils should be called StringValueUtils > --- > > Key: FLINK-6338 > URL: https://issues.apache.org/jira/browse/FLINK-6338 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Trivial > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3783: [FLINK-6338] Add support for DISTINCT into Code Generated...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske please have a look at this PR, it contains just the code generation part with optional distinct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6338) SimpleStringUtils should be called StringValueUtils
[ https://issues.apache.org/jira/browse/FLINK-6338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985197#comment-15985197 ] ASF GitHub Bot commented on FLINK-6338: --- GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3783 [FLINK-6338] Add support for DISTINCT into Code Generated Aggregations Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/stefanobortoli/flink FLINK-6338 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3783.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3783 commit 55e0a8d38187bef22b6135db7f4a5c1cc8f15811 Author: Stefano BortoliDate: 2017-04-26T17:22:04Z Added code generation distinct aggregation logic > SimpleStringUtils should be called StringValueUtils > --- > > Key: FLINK-6338 > URL: https://issues.apache.org/jira/browse/FLINK-6338 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Trivial > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3783: [FLINK-6338] Add support for DISTINCT into Code Ge...
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3783 [FLINK-6338] Add support for DISTINCT into Code Generated Aggregations Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/stefanobortoli/flink FLINK-6338 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3783.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3783 commit 55e0a8d38187bef22b6135db7f4a5c1cc8f15811 Author: Stefano BortoliDate: 2017-04-26T17:22:04Z Added code generation distinct aggregation logic --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985190#comment-15985190 ] ASF GitHub Bot commented on FLINK-6013: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113512176 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; --- End diff -- I wouldn't worry too much about since 1) the current code works well, and changing it doesn't provide extra readability 2) it requires dependency on flink-core, which is completely unnecessary just because of a config > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113512176 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; --- End diff -- I wouldn't worry too much about since 1) the current code works well, and changing it doesn't provide extra readability 2) it requires dependency on flink-core, which is completely unnecessary just because of a config --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985159#comment-15985159 ] ASF GitHub Bot commented on FLINK-6013: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113510125 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,109 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + + flink-metrics-datadog + flink-metrics-datadog + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + com.squareup.okhttp3 + okhttp + 3.6.0 + + + + com.squareup.okio + okio + 1.11.0 + + + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + jar-with-dependencies --- End diff -- After removing this line, the shaded jar will be named 'flink-metrics-datadog-1.3-SNAPSHOT-shaded.jar'. Is it what you want? > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113510125 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,109 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + + flink-metrics-datadog + flink-metrics-datadog + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + com.squareup.okhttp3 + okhttp + 3.6.0 + + + + com.squareup.okio + okio + 1.11.0 + + + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + jar-with-dependencies --- End diff -- After removing this line, the shaded jar will be named 'flink-metrics-datadog-1.3-SNAPSHOT-shaded.jar'. Is it what you want? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985157#comment-15985157 ] ASF GitHub Bot commented on FLINK-6390: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3782 Thanks a lot for the fast review. Agree with both issues raised. Will address them while merging... > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its completion by completing the > future. For hooks that do not >* need to store data, the future would be completed with null. >* >
[GitHub] flink issue #3782: [FLINK-6390] [checkpoints] Add API for checkpoints that a...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3782 Thanks a lot for the fast review. Agree with both issues raised. Will address them while merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985154#comment-15985154 ] ASF GitHub Bot commented on FLINK-6390: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3782#discussion_r113504901 --- Diff: flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import java.io.IOException; + +/** + * A simple serializer interface for versioned serialization. + * + * The serializer has a version (returned by {@link #getVersion()}) which can be attached + * to the serialized data. When the serializer evolves, the version can be used to identify + * with which prior version the data was serialized. + * + * {@code + * MyType someObject = ...; + * SimpleVersionedSerializer serializer = ...; + * + * byte[] serializedData = serializer.serialize(someObject); + * int version = serializer.getVersion(); + * + * MyType deserialized = serializer.deserialize(version, serializedData); + * + * byte[] someOldData = ...; + * int oldVersion = ...; + * MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData); + * + * } + * + * @param The data type serialized / deserialized by this serializer. + */ +public interface SimpleVersionedSerializer extends Versioned { + + /** +* Gets the version with which this serializer serializes. +* +* @return The version of the serialization schema. +*/ + @Override + int getVersion(); + + /** +* Serializes the given object. The serialization is assumed to correspond to the +* current serialization version (as returned by {@link #getVersion()}. +* +* +* @param checkpointData The object to serialize. +* @return The serialized data (bytes). +* +* @throws IOException Thrown, if the serialization fails. +*/ + byte[] serialize(E checkpointData) throws IOException; --- End diff -- `checkpointData` is maybe a too specific parameter name. > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985155#comment-15985155 ] ASF GitHub Bot commented on FLINK-6390: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3782#discussion_r113507485 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.hooks; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.checkpoint.MasterState; +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; + +/** + * Collection of methods to deal with checkpoint master hooks. + */ +public class MasterHooks { + + // + // checkpoint triggering + // + + /** +* Triggers all given master hooks and returns state objects for each hook that +* produced a state. +* +* @param hooks The hooks to trigger +* @param checkpointId The checkpoint ID of the triggering checkpoint +* @param timestamp The (informational) timestamp for the triggering checkpoint +* @param executor An executor that can be used for asynchronous I/O calls +* @param timeout The maximum time that a hook may take to complete +* +* @return A list containing all states produced by the hooks +* +* @throws FlinkException Thrown, if the hooks throw an exception, or the state+ +*deserialization fails. +*/ + public static List triggerMasterHooks( + Collectionhooks, + long checkpointId, + long timestamp, + Executor executor, + Time timeout) throws FlinkException { + + final ArrayList states = new ArrayList<>(hooks.size()); + + for (MasterTriggerRestoreHook hook : hooks) { + MasterState state = triggerHook(hook, checkpointId, timestamp, executor, timeout); + if (state != null) { + states.add(state); + } + } + + states.trimToSize(); + return states; + } + + private static MasterState triggerHook( + MasterTriggerRestoreHook hook, + long checkpointId, + long timestamp, + Executor executor, + Time timeout) throws FlinkException { + + @SuppressWarnings("unchecked") + final MasterTriggerRestoreHook typedHook = (MasterTriggerRestoreHook) hook; + + final String id = typedHook.getIdentifier(); + final SimpleVersionedSerializer serializer = typedHook.createCheckpointDataSerializer(); + + // call the hook! + final Future resultFuture; + try { + resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor); + }
[GitHub] flink pull request #3782: [FLINK-6390] [checkpoints] Add API for checkpoints...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3782#discussion_r113504901 --- Diff: flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import java.io.IOException; + +/** + * A simple serializer interface for versioned serialization. + * + * The serializer has a version (returned by {@link #getVersion()}) which can be attached + * to the serialized data. When the serializer evolves, the version can be used to identify + * with which prior version the data was serialized. + * + * {@code + * MyType someObject = ...; + * SimpleVersionedSerializer serializer = ...; + * + * byte[] serializedData = serializer.serialize(someObject); + * int version = serializer.getVersion(); + * + * MyType deserialized = serializer.deserialize(version, serializedData); + * + * byte[] someOldData = ...; + * int oldVersion = ...; + * MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData); + * + * } + * + * @param The data type serialized / deserialized by this serializer. + */ +public interface SimpleVersionedSerializer extends Versioned { + + /** +* Gets the version with which this serializer serializes. +* +* @return The version of the serialization schema. +*/ + @Override + int getVersion(); + + /** +* Serializes the given object. The serialization is assumed to correspond to the +* current serialization version (as returned by {@link #getVersion()}. +* +* +* @param checkpointData The object to serialize. +* @return The serialized data (bytes). +* +* @throws IOException Thrown, if the serialization fails. +*/ + byte[] serialize(E checkpointData) throws IOException; --- End diff -- `checkpointData` is maybe a too specific parameter name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3782: [FLINK-6390] [checkpoints] Add API for checkpoints...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3782#discussion_r113507485 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.hooks; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.checkpoint.MasterState; +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; + +/** + * Collection of methods to deal with checkpoint master hooks. + */ +public class MasterHooks { + + // + // checkpoint triggering + // + + /** +* Triggers all given master hooks and returns state objects for each hook that +* produced a state. +* +* @param hooks The hooks to trigger +* @param checkpointId The checkpoint ID of the triggering checkpoint +* @param timestamp The (informational) timestamp for the triggering checkpoint +* @param executor An executor that can be used for asynchronous I/O calls +* @param timeout The maximum time that a hook may take to complete +* +* @return A list containing all states produced by the hooks +* +* @throws FlinkException Thrown, if the hooks throw an exception, or the state+ +*deserialization fails. +*/ + public static List triggerMasterHooks( + Collectionhooks, + long checkpointId, + long timestamp, + Executor executor, + Time timeout) throws FlinkException { + + final ArrayList states = new ArrayList<>(hooks.size()); + + for (MasterTriggerRestoreHook hook : hooks) { + MasterState state = triggerHook(hook, checkpointId, timestamp, executor, timeout); + if (state != null) { + states.add(state); + } + } + + states.trimToSize(); + return states; + } + + private static MasterState triggerHook( + MasterTriggerRestoreHook hook, + long checkpointId, + long timestamp, + Executor executor, + Time timeout) throws FlinkException { + + @SuppressWarnings("unchecked") + final MasterTriggerRestoreHook typedHook = (MasterTriggerRestoreHook) hook; + + final String id = typedHook.getIdentifier(); + final SimpleVersionedSerializer serializer = typedHook.createCheckpointDataSerializer(); + + // call the hook! + final Future resultFuture; + try { + resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor); + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + throw new FlinkException("Error while triggering checkpoint master hook '" + id + '\'', t); +
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985138#comment-15985138 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 I think this is reasonable as the current implementation doesnt work for dynamic new topics. (we should also deprecate the current one) But let's hear what others say :) > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...
Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 I think this is reasonable as the current implementation doesnt work for dynamic new topics. (we should also deprecate the current one) But let's hear what others say :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985122#comment-15985122 ] Gyula Fora commented on FLINK-6390: --- we could call it completeCheckpoint for example > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its completion by completing the > future. For hooks that do not >* need to store data, the future would be completed with null. >* >* @param checkpointId The ID (logical timestamp, monotonously > increasing) of the checkpoint >* @param timestamp The wall clock timestamp when the
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985119#comment-15985119 ] Gyula Fora commented on FLINK-6390: --- Hi Stephan, This looks pretty useful. One thing that came to my mind about this whether it makes sense to add a hook when all tasks have completeted their local snapshot but before completing the full snapshot. (To implement a 2 phase committing logic for instance which could be used backends that present the data externally) Gyula > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its completion by
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985117#comment-15985117 ] ASF GitHub Bot commented on FLINK-6225: --- Github user PangZhi commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113505564 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); --- End diff -- @zentol There is another Row class used here. So need to use qualified name. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)