[GitHub] flink pull request #2673: [FLINK-4864] [table] Shade Calcite dependency in f...
GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2673 [FLINK-4864] [table] Shade Calcite dependency in flink-table Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This can solve version conflicts when users have a own Calcite dependency in the classpath. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink shade-calcite-FLINK-4864 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2673 commit 61d6c80789cce00f47c04df1e97ed0ea016fcbb3 Author: Jark WuDate: 2016-10-21T05:50:21Z [FLINK-4864] [table] Shade Calcite dependency in flink-table --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4864) Shade Calcite dependency in flink-table
[ https://issues.apache.org/jira/browse/FLINK-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15594176#comment-15594176 ] ASF GitHub Bot commented on FLINK-4864: --- GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2673 [FLINK-4864] [table] Shade Calcite dependency in flink-table Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This can solve version conflicts when users have a own Calcite dependency in the classpath. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink shade-calcite-FLINK-4864 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2673 commit 61d6c80789cce00f47c04df1e97ed0ea016fcbb3 Author: Jark WuDate: 2016-10-21T05:50:21Z [FLINK-4864] [table] Shade Calcite dependency in flink-table > Shade Calcite dependency in flink-table > --- > > Key: FLINK-4864 > URL: https://issues.apache.org/jira/browse/FLINK-4864 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > The Table API has a dependency on Apache Calcite. > A user reported to have version conflicts when having a own Calcite > dependency in the classpath. > The solution would be to shade away the Calcite dependency (Calcite's > transitive dependencies are already shaded). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation
[ https://issues.apache.org/jira/browse/FLINK-4866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15594169#comment-15594169 ] ASF GitHub Bot commented on FLINK-4866: --- GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2672 [FLINK-4866] [streaming] Make Trigger.clear() Abstract to Enforce Implementation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR makes Trigger.clear() method to be abstract, so that implementors of custom triggers will not forget to clean up their state/timers properly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink trigger-FLINK-4866 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2672.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2672 commit 313b2a6f1871357b3281e623423f49ef84ad59a1 Author: Jark WuDate: 2016-10-21T05:46:45Z [FLINK-4866] [streaming] Make Trigger.clear() Abstract to Enforce Implementation > Make Trigger.clear() Abstract to Enforce Implementation > --- > > Key: FLINK-4866 > URL: https://issues.apache.org/jira/browse/FLINK-4866 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Jark Wu > > If the method is not abstract implementors of custom triggers will not > realise that it could be necessary and they will likely not clean up their > state/timers properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2672: [FLINK-4866] [streaming] Make Trigger.clear() Abst...
GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2672 [FLINK-4866] [streaming] Make Trigger.clear() Abstract to Enforce Implementation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR makes Trigger.clear() method to be abstract, so that implementors of custom triggers will not forget to clean up their state/timers properly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink trigger-FLINK-4866 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2672.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2672 commit 313b2a6f1871357b3281e623423f49ef84ad59a1 Author: Jark WuDate: 2016-10-21T05:46:45Z [FLINK-4866] [streaming] Make Trigger.clear() Abstract to Enforce Implementation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4864) Shade Calcite dependency in flink-table
[ https://issues.apache.org/jira/browse/FLINK-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4864: -- Assignee: Jark Wu > Shade Calcite dependency in flink-table > --- > > Key: FLINK-4864 > URL: https://issues.apache.org/jira/browse/FLINK-4864 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > The Table API has a dependency on Apache Calcite. > A user reported to have version conflicts when having a own Calcite > dependency in the classpath. > The solution would be to shade away the Calcite dependency (Calcite's > transitive dependencies are already shaded). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation
[ https://issues.apache.org/jira/browse/FLINK-4866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4866: -- Assignee: Jark Wu > Make Trigger.clear() Abstract to Enforce Implementation > --- > > Key: FLINK-4866 > URL: https://issues.apache.org/jira/browse/FLINK-4866 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Jark Wu > > If the method is not abstract implementors of custom triggers will not > realise that it could be necessary and they will likely not clean up their > state/timers properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4862) NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger
[ https://issues.apache.org/jira/browse/FLINK-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15594036#comment-15594036 ] ASF GitHub Bot commented on FLINK-4862: --- GitHub user manuzhang opened a pull request: https://github.com/apache/flink/pull/2671 [FLINK-4862] fix Timer register in ContinuousEventTimeTrigger Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/manuzhang/flink fix_merge_window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2671.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2671 commit b2370946357044250511e25fce5078812ad22c82 Author: manuzhangDate: 2016-10-20T07:06:01Z [FLINK-4862] fix Timer register in ContinuousEventTimeTrigger > NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger > -- > > Key: FLINK-4862 > URL: https://issues.apache.org/jira/browse/FLINK-4862 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Reporter: Manu Zhang > > h3. what's the error ? > The following NPE error is thrown when EventTimeSessionWindows with > ContinuousEventTimeTrigger is used. > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.clear(ContinuousEventTimeTrigger.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:768) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:310) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:196) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:271) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609) > at java.lang.Thread.run(Thread.java:745) > {code} > h3. how to reproduce ? > use {{ContinuousEventTimeTrigger}} instead of the default > {{EventTimeTrigger}} in [SessionWindowing | > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java#L84] > example. > h3. what's the cause ? > When two session windows are being merged, the states of the two > {{ContinuousEventTimeTrigger}} are merged as well and the new namespace is > the merged window. Later when the context tries to delete {{Timer}} from the > old trigger and looks up the timestamp by the old namespace, null value is > returned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
GitHub user manuzhang opened a pull request: https://github.com/apache/flink/pull/2671 [FLINK-4862] fix Timer register in ContinuousEventTimeTrigger Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/manuzhang/flink fix_merge_window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2671.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2671 commit b2370946357044250511e25fce5078812ad22c82 Author: manuzhangDate: 2016-10-20T07:06:01Z [FLINK-4862] fix Timer register in ContinuousEventTimeTrigger --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge
[ https://issues.apache.org/jira/browse/FLINK-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manu Zhang closed FLINK-4863. - Resolution: Duplicate > states of merging window and trigger are set to different TimeWindows on merge > -- > > Key: FLINK-4863 > URL: https://issues.apache.org/jira/browse/FLINK-4863 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Reporter: Manu Zhang > > While window state is set to the mergeResult's stateWindow (one of original > windows), trigger state is set to the mergeResult itself. This will fail > {{Timer}} of {{ContinuousEventTimeTrigger}} since its window cannot be found > in the window state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge
[ https://issues.apache.org/jira/browse/FLINK-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15593811#comment-15593811 ] Manu Zhang commented on FLINK-4863: --- Found this is related to FLINK-4862. {{ContinuousEventTimeTrigger}}'s original state ({{fireTimestamp}}) is already cleared in {{context.onMerge(mergeWindows)}} so that the original {{Timer}} cannot be removed by the {{fireTimestamp}} in {{context.clear()}} later. I will close this and try fixing in FLINK-4862. > states of merging window and trigger are set to different TimeWindows on merge > -- > > Key: FLINK-4863 > URL: https://issues.apache.org/jira/browse/FLINK-4863 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Reporter: Manu Zhang > > While window state is set to the mergeResult's stateWindow (one of original > windows), trigger state is set to the mergeResult itself. This will fail > {{Timer}} of {{ContinuousEventTimeTrigger}} since its window cannot be found > in the window state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge
[ https://issues.apache.org/jira/browse/FLINK-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15593797#comment-15593797 ] ASF GitHub Bot commented on FLINK-4863: --- Github user manuzhang closed the pull request at: https://github.com/apache/flink/pull/2666 > states of merging window and trigger are set to different TimeWindows on merge > -- > > Key: FLINK-4863 > URL: https://issues.apache.org/jira/browse/FLINK-4863 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Reporter: Manu Zhang > > While window state is set to the mergeResult's stateWindow (one of original > windows), trigger state is set to the mergeResult itself. This will fail > {{Timer}} of {{ContinuousEventTimeTrigger}} since its window cannot be found > in the window state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2666: [FLINK-4863] fix trigger context window on merge
Github user manuzhang closed the pull request at: https://github.com/apache/flink/pull/2666 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2617: [FLINK-4705] Instrument FixedLengthRecordSorter
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2617 I have started like to take a look at this... The `FixLegthRecordSorter` and its interactions with the serializers / comparators was not very well tested before, hence not activated. I am trying to double-check that... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2618 Thanks for the comments @mxm ! I integrated them and waiting for Travis. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4204) Clean up gelly-examples
[ https://issues.apache.org/jira/browse/FLINK-4204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15593069#comment-15593069 ] ASF GitHub Bot commented on FLINK-4204: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2670 [FLINK-4204] [gelly] Clean up gelly-examples Moves drivers into separate package. Adds default main class to print usage listing included classes. Includes documentation for running Gelly examples. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4204_clean_up_gelly_examples Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2670.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2670 commit 642267c70f362ce5414838aaddbed0dcd6b60934 Author: Greg HoganDate: 2016-08-24T15:32:43Z [FLINK-4204] [gelly] Clean up gelly-examples Moves drivers into separate package. Adds default main class to print usage listing included classes. Includes documentation for running Gelly examples. > Clean up gelly-examples > --- > > Key: FLINK-4204 > URL: https://issues.apache.org/jira/browse/FLINK-4204 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > > The gelly-examples has grown quite big (14 examples) and contains several > examples that illustrate the same functionality. Examples should help users > understand how to use the API and ideally show how to use 1-2 features. > Also, it is helpful to state the purpose of each example in the comments. > We should keep the example set small and move everything that does not fit > there to the library. > I propose to remove the following: > - ClusteringCoefficient: the functionality already exists as a library method. > - HITS: the functionality already exists as a library method. > - JaccardIndex: the functionality already exists as a library method. > - SingleSourceShortestPaths: the example shows how to use scatter-gather > iterations. HITSAlgorithm shows the same feature plus the use of aggregators. > I propose we keep this one instead. > - TriangleListing: the functionality already exists as a library method -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2670: [FLINK-4204] [gelly] Clean up gelly-examples
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2670 [FLINK-4204] [gelly] Clean up gelly-examples Moves drivers into separate package. Adds default main class to print usage listing included classes. Includes documentation for running Gelly examples. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4204_clean_up_gelly_examples Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2670.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2670 commit 642267c70f362ce5414838aaddbed0dcd6b60934 Author: Greg HoganDate: 2016-08-24T15:32:43Z [FLINK-4204] [gelly] Clean up gelly-examples Moves drivers into separate package. Adds default main class to print usage listing included classes. Includes documentation for running Gelly examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2506: [FLINK-4581] [table] Table API throws "No suitable driver...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2506 +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2669 [FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to MiniCluster This PR is based on #2651, #2655 and #2657. If the managed memory size for the task manager has not been set in the Configuration, then it is automatically calculated by dividing the available memory by the number of distributed components. Additionally this PR allows to provide a MetricRegistry to the TaskManagerRunner. That way it is possible to use the MiniCluster's MetricRegistry. Add memory calculation for task managers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink miniClusterUpdate Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2669 commit 6487af737b57ca16190c0f4a6b63d4afd2af2b06 Author: Till RohrmannDate: 2016-10-17T14:22:16Z [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. commit cf7661aafb71649360ad6159f27a82433bfd8f75 Author: Till Rohrmann Date: 2016-10-17T14:03:02Z [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds the MetricRegistry to the RM which can be used to register metrics. Apart from these changes the PR restructures the code of the RM a little bit and fixes some blocking operations. The PR also moves the TestingFatalErrorHandler into the util package of flink-runtime test. That it is usable across multiple tests. Introduce ResourceManagerRunner to handle errors in the ResourceManager commit 61d328adb637c44889aa724c270c39657d1289c2 Author: Till Rohrmann Date: 2016-10-18T16:03:00Z [FLINK-4853] [rm] Clean up job manager registration at the resource manager Introduce the JobLeaderIdService which automatically retrieves the current job leader id. This job leader id is used to validate job manager registartion attempts. Additionally, it is used to disconnect old job leaders from the resource manager. Add comments commit 58e8d6c06b55456793d2dffbee309e491d21d309 Author: Till Rohrmann Date: 2016-10-20T09:07:08Z [FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to MiniCluster If the managed memory size for the task manager has not been set in the Configuration, then it is automatically calculated by dividing the available memory by the number of distributed components. Additionally this PR allows to provide a MetricRegistry to the TaskManagerRunner. That way it is possible to use the MiniCluster's MetricRegistry. Add memory calculation for task managers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4581) Table API throws "No suitable driver found for jdbc:calcite"
[ https://issues.apache.org/jira/browse/FLINK-4581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592877#comment-15592877 ] ASF GitHub Bot commented on FLINK-4581: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2506 +1 to merge > Table API throws "No suitable driver found for jdbc:calcite" > > > Key: FLINK-4581 > URL: https://issues.apache.org/jira/browse/FLINK-4581 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > It seems that in certain cases the internal Calcite JDBC driver cannot be > found. We should either try to get rid of the entire JDBC invocation or fix > this bug. > From ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html > {code} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable > driver found for jdbc:calcite: > at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151) > at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106) > at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127) > at > org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56) > at > org.apache.flink.api.table.TableEnvironment.(TableEnvironment.scala:73) > at > org.apache.flink.api.table.StreamTableEnvironment.(StreamTableEnvironment.scala:58) > at > org.apache.flink.api.java.table.StreamTableEnvironment.(StreamTableEnvironment.scala:45) > at > org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376) > at > org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala) > at > org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > ... 6 more > Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite: > at java.sql.DriverManager.getConnection(DriverManager.java:689) > at java.sql.DriverManager.getConnection(DriverManager.java:208) > at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144) > ... 20 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592866#comment-15592866 ] Stefan Richter commented on FLINK-4867: --- I have already pushed my code into https://github.com/StefanRRichter/RadixSort so that you can take a look. I will do some cleanup, more documentation, and tests if I find some more time to polish this. If you have questions about the code, just drop me an email. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4705) Instrument FixedLengthRecordSorter
[ https://issues.apache.org/jira/browse/FLINK-4705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592853#comment-15592853 ] ASF GitHub Bot commented on FLINK-4705: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2617 I have started like to take a look at this... The `FixLegthRecordSorter` and its interactions with the serializers / comparators was not very well tested before, hence not activated. I am trying to double-check that... > Instrument FixedLengthRecordSorter > -- > > Key: FLINK-4705 > URL: https://issues.apache.org/jira/browse/FLINK-4705 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > The {{NormalizedKeySorter}} sorts on the concatenation of (potentially > partial) keys plus an 8-byte pointer to the record. After sorting each > pointer must be dereferenced, which is not cache friendly. > The {{FixedLengthRecordSorter}} sorts on the concatentation of full keys > followed by the remainder of the record. The records can then be deserialized > in sequence. > Instrumenting the {{FixedLengthRecordSorter}} requires implementing the > comparator methods {{writereadWithKeyNormalization}} and > {{readWithKeyNormalization}}. > Testing {{JaccardIndex}} on an m4.16xlarge the scale 18 runtime dropped from > 71.8 to 68.8 s (4.3% faster) and the scale 20 runtime dropped from 546.1 to > 501.8 s (8.8% faster). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing
Gyula Fora created FLINK-4873: - Summary: Add config option to specify "home directory" for YARN client resource sharing Key: FLINK-4873 URL: https://issues.apache.org/jira/browse/FLINK-4873 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 1.2.0, 1.1.3 Reporter: Gyula Fora The YARN client currently uses FileSystem.getHomeDirectory() to store the jar files that needed to be shared on the cluster. This pretty much forces users to run HDFS or something compatible with the Hadoop FS api on the cluster. In some production environments file systems (distributed or simply shared) are simply mounted under the same path and do not require the use of the hadoop api for convenience. If we want to run Flink on YARN in these cases we would need to be able to define the "home directory" where Flink should copy the files for sharing. It could be something like: yarn.resource.upload.dir in the flink-conf.yaml -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2659: [FLINK-4857] Remove throws clause from ZooKeeperUt...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2659 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2570 Rebased version looks good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2657: [FLINK-4853] [rm] Harden job manager registration at the ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2657 Sorry @mxm. The compilation problem were introduced due to an incomplete rebasing. Should be fixed now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2491) Operators are not participating in state checkpointing in some cases
[ https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592479#comment-15592479 ] Eryn Dahlstedt commented on FLINK-2491: --- We have been running into this problem as well. Do you have an example of this work around? Thanks for any help on this. > Operators are not participating in state checkpointing in some cases > > > Key: FLINK-2491 > URL: https://issues.apache.org/jira/browse/FLINK-2491 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Robert Metzger >Assignee: Márton Balassi >Priority: Critical > Fix For: 1.0.0 > > > While implementing a test case for the Kafka Consumer, I came across the > following bug: > Consider the following topology, with the operator parallelism in parentheses: > Source (2) --> Sink (1). > In this setup, the {{snapshotState()}} method is called on the source, but > not on the Sink. > The sink receives the generated data. > The only one of the two sources is generating data. > I've implemented a test case for this, you can find it here: > https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4857) ZooKeeperUtils have a throws exception clause without throwing exceptions
[ https://issues.apache.org/jira/browse/FLINK-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592476#comment-15592476 ] ASF GitHub Bot commented on FLINK-4857: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2659 > ZooKeeperUtils have a throws exception clause without throwing exceptions > - > > Key: FLINK-4857 > URL: https://issues.apache.org/jira/browse/FLINK-4857 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Flink's {{ZooKeeperUtils}} contains functions which have a throws clause even > though they don't throw an {{Exception}}. This is wrong and should be fixed > by removing the throw clauses. > Changing the ZooKeeperUtils will help to properly implement the > {{HighAvailabilityServices}} in the flip-6 branch, because the high > availability service methods don't have to throw exceptions then. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4857) ZooKeeperUtils have a throws exception clause without throwing exceptions
[ https://issues.apache.org/jira/browse/FLINK-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-4857. Resolution: Fixed Fixed via 6f0faf9bb35e7cac3a38ed792cdabd6400fc4c79 > ZooKeeperUtils have a throws exception clause without throwing exceptions > - > > Key: FLINK-4857 > URL: https://issues.apache.org/jira/browse/FLINK-4857 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Flink's {{ZooKeeperUtils}} contains functions which have a throws clause even > though they don't throw an {{Exception}}. This is wrong and should be fixed > by removing the throw clauses. > Changing the ZooKeeperUtils will help to properly implement the > {{HighAvailabilityServices}} in the flip-6 branch, because the high > availability service methods don't have to throw exceptions then. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2659: [FLINK-4857] Remove throws clause from ZooKeeperUtils fun...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2659 Thanks for the review @mxm. Will merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2628: [FLINK-3722] [runtime] Don't / and % when sorting
Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2628 > I transcribed Quicksort so as to remove considerations of Java performance and inlining. It was not clear to me that if we encapsulated the index, page number, and page offset into an object that Java would inline the various increment and decrement functions. Also, I don't think this looks too bad. I'm happy to reformat if that is preferred. OK, I would say that it is OK like this, but let's see what the others will say. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4857) ZooKeeperUtils have a throws exception clause without throwing exceptions
[ https://issues.apache.org/jira/browse/FLINK-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592452#comment-15592452 ] ASF GitHub Bot commented on FLINK-4857: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2659 Thanks for the review @mxm. Will merge this PR. > ZooKeeperUtils have a throws exception clause without throwing exceptions > - > > Key: FLINK-4857 > URL: https://issues.apache.org/jira/browse/FLINK-4857 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Flink's {{ZooKeeperUtils}} contains functions which have a throws clause even > though they don't throw an {{Exception}}. This is wrong and should be fixed > by removing the throw clauses. > Changing the ZooKeeperUtils will help to properly implement the > {{HighAvailabilityServices}} in the flip-6 branch, because the high > availability service methods don't have to throw exceptions then. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4872) Type erasure problem exclusively on cluster execution
Martin Junghanns created FLINK-4872: --- Summary: Type erasure problem exclusively on cluster execution Key: FLINK-4872 URL: https://issues.apache.org/jira/browse/FLINK-4872 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.1.2 Reporter: Martin Junghanns The following codes runs fine on local and collection execution environment but fails when executed on a cluster. {code:title=Problem.java} import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import java.lang.reflect.Array; public class Problem { public static class Pojo { } public static class Foo extends Tuple1 { } public static class Bar extends Tuple1{ } public static class UDF implements MapFunction { private final Class clazz; public UDF(Class clazz) { this.clazz = clazz; } @Override public Bar map(Foo value) throws Exception { Bar bar = new Bar<>(); //noinspection unchecked bar.f0 = (T[]) Array.newInstance(clazz, 10); return bar; } } public static void main(String[] args) throws Exception { // runs in local, collection and cluster execution withLong(); // runs in local and collection execution, fails on cluster execution withPojo(); } public static void withLong() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Foo foo = new Foo<>(); foo.f0 = 42L; DataSet barDataSource = env.fromElements(foo); DataSet map = barDataSource.map(new UDF<>(Long.class)); map.print(); } public static void withPojo() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Foo foo = new Foo<>(); foo.f0 = new Pojo(); DataSet barDataSource = env.fromElements(foo); DataSet map = barDataSource.map(new UDF<>(Pojo.class)); map.print(); } } {code} {code:title=ProblemTest.java} import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class ProblemTest extends MultipleProgramsTestBase { public ProblemTest(TestExecutionMode mode) { super(mode); } @Test public void testWithLong() throws Exception { Problem.withLong(); } @Test public void testWithPOJO() throws Exception { Problem.withPojo(); } } {code} Exception: {code} The return type of function 'withPojo(Problem.java:58)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. org.apache.flink.api.java.DataSet.getType(DataSet.java:178) org.apache.flink.api.java.DataSet.collect(DataSet.java:407) org.apache.flink.api.java.DataSet.print(DataSet.java:1605) Problem.withPojo(Problem.java:60) Problem.main(Problem.java:38) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592447#comment-15592447 ] ASF GitHub Bot commented on FLINK-4853: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2657 Sorry @mxm. The compilation problem were introduced due to an incomplete rebasing. Should be fixed now. > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the {{ResourceManager}} blocks > threads in the {{RpcService.execute}} pool. This is not ideal and can be > avoided by not waiting on a {{Future}} in this call. > I propose to encapsulate the leader id retrieval operation in a distinct > service so that it can be separated from the {{ResourceManager}}. This will > reduce the complexity of the {{ResourceManager}} and make the individual > components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4871) Add memory calculation for TaskManagers and forward MetricRegistry
[ https://issues.apache.org/jira/browse/FLINK-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592437#comment-15592437 ] ASF GitHub Bot commented on FLINK-4871: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2669 [FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to MiniCluster This PR is based on #2651, #2655 and #2657. If the managed memory size for the task manager has not been set in the Configuration, then it is automatically calculated by dividing the available memory by the number of distributed components. Additionally this PR allows to provide a MetricRegistry to the TaskManagerRunner. That way it is possible to use the MiniCluster's MetricRegistry. Add memory calculation for task managers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink miniClusterUpdate Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2669 commit 6487af737b57ca16190c0f4a6b63d4afd2af2b06 Author: Till RohrmannDate: 2016-10-17T14:22:16Z [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. commit cf7661aafb71649360ad6159f27a82433bfd8f75 Author: Till Rohrmann Date: 2016-10-17T14:03:02Z [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds the MetricRegistry to the RM which can be used to register metrics. Apart from these changes the PR restructures the code of the RM a little bit and fixes some blocking operations. The PR also moves the TestingFatalErrorHandler into the util package of flink-runtime test. That it is usable across multiple tests. Introduce ResourceManagerRunner to handle errors in the ResourceManager commit 61d328adb637c44889aa724c270c39657d1289c2 Author: Till Rohrmann Date: 2016-10-18T16:03:00Z [FLINK-4853] [rm] Clean up job manager registration at the resource manager Introduce the JobLeaderIdService which automatically retrieves the current job leader id. This job leader id is used to validate job manager registartion attempts. Additionally, it is used to disconnect old job leaders from the resource manager. Add comments commit 58e8d6c06b55456793d2dffbee309e491d21d309 Author: Till Rohrmann Date: 2016-10-20T09:07:08Z [FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to MiniCluster If the managed memory size for the task manager has not been set in the Configuration, then it is automatically calculated by dividing the available memory by the number of distributed components. Additionally this PR allows to provide a MetricRegistry to the TaskManagerRunner. That way it is possible to use the MiniCluster's MetricRegistry. Add memory calculation for task managers > Add memory calculation for TaskManagers and forward MetricRegistry > -- > > Key: FLINK-4871 > URL: https://issues.apache.org/jira/browse/FLINK-4871 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Add automatic memory calculation for {{TaskManagers}} executed by the > {{MiniCluster}}. > Additionally, change the {{TaskManagerRunner}} to accept a given > {{MetricRegistry}} so that the one instantiated by the {{MiniCluster}} is > used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4871) Add memory calculation for TaskManagers and forward MetricRegistry
Till Rohrmann created FLINK-4871: Summary: Add memory calculation for TaskManagers and forward MetricRegistry Key: FLINK-4871 URL: https://issues.apache.org/jira/browse/FLINK-4871 Project: Flink Issue Type: Sub-task Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor Add automatic memory calculation for {{TaskManagers}} executed by the {{MiniCluster}}. Additionally, change the {{TaskManagerRunner}} to accept a given {{MetricRegistry}} so that the one instantiated by the {{MiniCluster}} is used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2668: Add EvaluateDataSetOperation for LabeledVector. Th...
GitHub user tfournier314 opened a pull request: https://github.com/apache/flink/pull/2668 Add EvaluateDataSetOperation for LabeledVector. This closes #4865 Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tfournier314/flink LabeledDataInEvaluate Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2668.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2668 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592359#comment-15592359 ] Gabor Gevay commented on FLINK-4867: Hm, this sounds quite interesting. Could you please share the code? > As it is radix based and not comparison based, it would require some way to > expose partial sort keys instead of a compareTo method Isn't the normalized key stuff that we already have solves this part? > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay reassigned FLINK-4867: -- Assignee: Gabor Gevay > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592367#comment-15592367 ] Gabor Gevay commented on FLINK-4867: That's nice, thanks! > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs
[ https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592357#comment-15592357 ] Philipp von dem Bussche commented on FLINK-2821: Thank you for all your effort [~mxm]. I have tested this and was able to connect a jobmanager and a task manager in a docker-machine environment on my Mac as well as in Rancher. For the Rancher setup to work though I had to have the bind-address be set to 0.0.0.0 . I think this makes sense since Rancher introduces this additional 10.x address (on top of the 172.x address given by Docker) but when specifying the hostname as bind address it would only bind to the 172.x address. One other thing which I realized was that my local flink cli on my Mac would not work together with your customer build anymore because of version discrepancies. I felt this is quite harsh given that I am running 1.1.3 on bother sides but obviously different builds. I will play around with this a bit more and send some data across and let you know if I see anything else popping up. Thanks again ! > Change Akka configuration to allow accessing actors from different URLs > --- > > Key: FLINK-2821 > URL: https://issues.apache.org/jira/browse/FLINK-2821 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Robert Metzger >Assignee: Maximilian Michels > > Akka expects the actor's URL to be exactly matching. > As pointed out here, cases where users were complaining about this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html > - Proxy routing (as described here, send to the proxy URL, receiver > recognizes only original URL) > - Using hostname / IP interchangeably does not work (we solved this by > always putting IP addresses into URLs, never hostnames) > - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still > no solution to that (but seems not too much of a restriction) > I am aware that this is not possible due to Akka, so it is actually not a > Flink bug. But I think we should track the resolution of the issue here > anyways because its affecting our user's satisfaction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4870) ContinuousFileMonitoringFunction does not properly handle absolut Windows paths
Chesnay Schepler created FLINK-4870: --- Summary: ContinuousFileMonitoringFunction does not properly handle absolut Windows paths Key: FLINK-4870 URL: https://issues.apache.org/jira/browse/FLINK-4870 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.1.2 Reporter: Chesnay Schepler Priority: Minor Fix For: 1.2.0 The ContinuousFileMonitoringFunction fails for absolute windows paths without a dedicated scheme (e.g "C:\\tmp\\test.csv"), since the String path is directly fed into the URI constructor (which doesn't handle it properly) instead of first creating a flink Path and converting that into an URI. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4869) Store record pointer after record keys
[ https://issues.apache.org/jira/browse/FLINK-4869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-4869: --- Labels: performance (was: ) > Store record pointer after record keys > -- > > Key: FLINK-4869 > URL: https://issues.apache.org/jira/browse/FLINK-4869 > Project: Flink > Issue Type: Sub-task > Components: Core >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > {{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} > separate from the memory segments used for the sort keys. By storing the > pointer after the sort keys the addition of the offset is moved from > {{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which > are O\(n). > Will run a performance comparison before submitting a PR to how significant a > performance improvement this would yield. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4860) Sort performance
[ https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-4860: --- Component/s: Local Runtime > Sort performance > > > Key: FLINK-4860 > URL: https://issues.apache.org/jira/browse/FLINK-4860 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Greg Hogan > Labels: performance > > A super-task for improvements to Flink's sort performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4860) Sort performance
[ https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-4860: --- Labels: performance (was: ) > Sort performance > > > Key: FLINK-4860 > URL: https://issues.apache.org/jira/browse/FLINK-4860 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Greg Hogan > Labels: performance > > A super-task for improvements to Flink's sort performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4869) Store record pointer after record keys
Greg Hogan created FLINK-4869: - Summary: Store record pointer after record keys Key: FLINK-4869 URL: https://issues.apache.org/jira/browse/FLINK-4869 Project: Flink Issue Type: Sub-task Components: Core Affects Versions: 1.2.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor {{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} separate from the memory segments used for the sort keys. By storing the pointer after the sort keys the addition of the offset is moved from {{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which are O\(n). Will run a performance comparison before submitting a PR to how significant a performance improvement this would yield. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592292#comment-15592292 ] Stefan Richter commented on FLINK-4867: --- If you sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592292#comment-15592292 ] Stefan Richter edited comment on FLINK-4867 at 10/20/16 4:32 PM: - If sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. was (Author: srichter): If you sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592246#comment-15592246 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2570 Rebased version looks good to me. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592239#comment-15592239 ] Greg Hogan commented on FLINK-4867: --- It will be very interesting to see the results of this project. Perhaps you should self-assign the ticket until it can be handed over? Inline status is logged with the JVM arguments {{-XX:+UnlockDiagnosticVMOptions -XX:+PrintInlining}}. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4868) Insertion sort could avoid the swaps
[ https://issues.apache.org/jira/browse/FLINK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-4868: --- Description: This is about the fallback to insertion sort at the beginning of {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when we are at the bottom of the quick sort recursion tree. The inner loop does a series of swaps on adjacent elements for moving a block of several elements one slot to the right and inserting the ith element at the hole. However, it would be faster to first copy the ith element to a temp location, and then move the block of elements to the right without swaps, and then copy the original ith element to the hole. Moving the block of elements without swaps could be achieved by calling {{UNSAFE.copyMemory}} only once for every element (as opposed to the three calls in {{MemorySegment.swap}} currently being done). (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like memcpy, so I'm not sure if we can do the entire block of elements with maybe even one {{UNSAFE.copyMemory}}.) Note that the threshold for switching to the insertion sort could probably be increased after this. was: This is about the fallback to insertion sort at the beginning of {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when we are at the bottom of the quick sort recursion tree. The inner loop does a series of swaps on adjacent elements for moving a block of several elements one slot to the right and inserting the ith element at the hole. However, it would be faster to first copy the ith element to a temp location, and then move the block of elements to the right without swaps, and then copy the original ith element to the hole. Moving the block of elements without swaps could be achieved by calling {{UNSAFE.copyMemory}} only once for every element (as opposed to the three calls in {{MemorySegment.swap}} currently being done). (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like memcpy, so I'm not sure if we can do the entire block of elements with maybe even one {{UNSAFE.copyMemory}}.) > Insertion sort could avoid the swaps > > > Key: FLINK-4868 > URL: https://issues.apache.org/jira/browse/FLINK-4868 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This is about the fallback to insertion sort at the beginning of > {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when > we are at the bottom of the quick sort recursion tree. > The inner loop does a series of swaps on adjacent elements for moving a block > of several elements one slot to the right and inserting the ith element at > the hole. However, it would be faster to first copy the ith element to a temp > location, and then move the block of elements to the right without swaps, and > then copy the original ith element to the hole. > Moving the block of elements without swaps could be achieved by calling > {{UNSAFE.copyMemory}} only once for every element (as opposed to the three > calls in {{MemorySegment.swap}} currently being done). > (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like > memcpy, so I'm not sure if we can do the entire block of elements with maybe > even one {{UNSAFE.copyMemory}}.) > Note that the threshold for switching to the insertion sort could probably be > increased after this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4868) Insertion sort could avoid the swaps
Gabor Gevay created FLINK-4868: -- Summary: Insertion sort could avoid the swaps Key: FLINK-4868 URL: https://issues.apache.org/jira/browse/FLINK-4868 Project: Flink Issue Type: Sub-task Components: Local Runtime Reporter: Gabor Gevay Priority: Minor This is about the fallback to insertion sort at the beginning of {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when we are at the bottom of the quick sort recursion tree. The inner loop does a series of swaps on adjacent elements for moving a block of several elements one slot to the right and inserting the ith element at the hole. However, it would be faster to first copy the ith element to a temp location, and then move the block of elements to the right without swaps, and then copy the original ith element to the hole. Moving the block of elements without swaps could be achieved by calling {{UNSAFE.copyMemory}} only once for every element (as opposed to the three calls in {{MemorySegment.swap}} currently being done). (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like memcpy, so I'm not sure if we can do the entire block of elements with maybe even one {{UNSAFE.copyMemory}}.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592186#comment-15592186 ] ASF GitHub Bot commented on FLINK-3722: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2628 > I transcribed Quicksort so as to remove considerations of Java performance and inlining. It was not clear to me that if we encapsulated the index, page number, and page offset into an object that Java would inline the various increment and decrement functions. Also, I don't think this looks too bad. I'm happy to reformat if that is preferred. OK, I would say that it is OK like this, but let's see what the others will say. > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4867) Investigate code generation for improving sort performance
Gabor Gevay created FLINK-4867: -- Summary: Investigate code generation for improving sort performance Key: FLINK-4867 URL: https://issues.apache.org/jira/browse/FLINK-4867 Project: Flink Issue Type: Sub-task Components: Local Runtime Reporter: Gabor Gevay Priority: Minor This issue is for investigating whether code generation could speed up sorting. We should make some performance measurements on hand-written code that is similar to what we could generate, to see whether investing more time into this is worth it. If we find that it is worth it, we can open a second Jira for the actual implementation of the code generation. I think we could generate one class at places where we currently instantiate {{QuickSort}}. This generated class would include the functionality of {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, {{MemorySegment.compare}}, and {{MemorySegment.swap}}. Btw. I'm planning to give this as a student project at a TU Berlin course in the next few months. Some concrete ideas about how could a generated sorter be faster than the current sorting code: * {{MemorySegment.compare}} could be specialized for ** Length: for small records, the loop could be unrolled ** Endiannes (currently it is optimized for big endian; and in the little endian case (e.g. x86) it does a Long.reverseBytes for each long read) * {{MemorySegment.swapBytes}} ** In case of small records, using three {{UNSAFE.copyMemory}} is probably not as efficient as a specialized swap, because *** We could use total loop unrolling in generated code (because we know the exact record size) *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] *** We will only need 2/3 the memory bandwidth, because the temporary storage could be a register if we swap one byte (or one {{long}}) at a time ** several checks might be eliminated * Better inlining behaviour could be achieved ** Virtual function calls to the methods of {{InMemorySorter}} could be eliminated. (Note, that these are problematic to devirtualize by the JVM if there are different derived classes used in a single Flink job (see \[8,7\]).) ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the excessive checks make it too large ** {{MemorySegment.compare}} is probably also not inlined currently, because those two while loops are too large \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, long, Object, long, long) \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ \[8\] http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ \[9\] http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2570 I updated this on top of the latest master with @StefanRRichter's state changes. Please take another look, @StefanRRichter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-4844. - Resolution: Implemented Fix Version/s: 1.2.0 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.2.0 > > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles
[ https://issues.apache.org/jira/browse/FLINK-4842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-4842. - Resolution: Implemented Fix Version/s: 1.2.0 > Introduce test to enforce order of operator / udf lifecycles > - > > Key: FLINK-4842 > URL: https://issues.apache.org/jira/browse/FLINK-4842 > Project: Flink > Issue Type: Test >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.2.0 > > > We should introduce a test that enforces a certain order in which life cycle > methods of operators and udfs are called, so that they are not easily changed > by accident. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2662: [FLINK-4824] [client] CliFrontend shows misleading error ...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2662 @mxm thanks for the review. I added a second commit which I think satisfies your request. When no job is executed then the message is printed to stderr without a stacktrace. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592051#comment-15592051 ] ASF GitHub Bot commented on FLINK-4844: --- Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2648 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2648 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592052#comment-15592052 ] ASF GitHub Bot commented on FLINK-3674: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2570 I updated this on top of the latest master with @StefanRRichter's state changes. Please take another look, @StefanRRichter. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4824) CliFrontend shows misleading error message when main() method returns before env.execute()
[ https://issues.apache.org/jira/browse/FLINK-4824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592027#comment-15592027 ] ASF GitHub Bot commented on FLINK-4824: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2662 @mxm thanks for the review. I added a second commit which I think satisfies your request. When no job is executed then the message is printed to stderr without a stacktrace. > CliFrontend shows misleading error message when main() method returns before > env.execute() > -- > > Key: FLINK-4824 > URL: https://issues.apache.org/jira/browse/FLINK-4824 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Greg Hogan > > While testing Flink by running the > {{./examples/streaming/SocketWindowWordCount.jar}} example, I got the > following error message: > {code} > ./bin/flink run ./examples/streaming/SocketWindowWordCount.jar > Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 > Using address 127.0.0.1:6123 to connect to JobManager. > JobManager web interface address http://127.0.0.1:8081 > Starting execution of program > No port specified. Please run 'SocketWindowWordCount --port ', where > port is the address of the text server > To start a simple text server, run 'netcat -l ' and type the input text > into the command line > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > didn't contain Flink jobs. Perhaps you forgot to call execute() on the > execution environment. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:324) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:774) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:985) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1032) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1029) > at > org.apache.flink.runtime.security.SecurityContext$1.run(SecurityContext.java:82) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.flink.runtime.security.SecurityContext.runSecured(SecurityContext.java:79) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1029) > {code} > I think the error message is misleading, because I tried executing a valid > Flink job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591955#comment-15591955 ] Anton Mushin edited comment on FLINK-4832 at 10/20/16 2:31 PM: --- Hello I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}} also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will be called if elements are in inputData. {code} TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN element : inputData) { IN inCopy = inSerializer.copy(element); OUT out = function.map(inCopy); result.add(outSerializer.copy(out)); } {code} And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited for examle as {code} override def initiate(partial: Row): Unit = { partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends SumAggregate[T] } {code} then next test will be passed {code} @Test def testSumNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0.0,0.0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test def testCountNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0,0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} was (Author: anmu): Hello I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}} also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will be called if elements are in inputData. {code} TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN element : inputData) { IN inCopy = inSerializer.copy(element); OUT out = function.map(inCopy); result.add(outSerializer.copy(out)); } {code} And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited for examle as {code} override def initiate(partial: Row): Unit = { partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends SumAggregate[T] } {code} then next test will be passed {code} @Test def testDataSetAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1) FROM MyTable" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "231" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test def testSumNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0.0,0.0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink >
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84276796 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -43,16 +41,18 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; import java.util.Queue; +import static org.apache.flink.streaming.api.functions.source.RichFileInputSplit.EOS; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The operator that reads the {@link FileInputSplit splits} received from the preceding + * The operator that reads the {@link RichFileInputSplit splits} received from the preceding * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction} * which has a parallelism of 1, this operator can have DOP > 1. * --- End diff -- Generic types are not documented in the JavaDoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288975 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -347,34 +328,17 @@ public void run() { } } - private Tuple3getReaderState() throws IOException { - List snapshot = new ArrayList<>(this.pendingSplits.size()); - for (FileInputSplit split: this.pendingSplits) { - snapshot.add(split); - } - - // remove the current split from the list if inside. - if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) { - this.pendingSplits.remove(); - } - - if (this.currentSplit != null) { - if (this.format instanceof CheckpointableInputFormat) { - @SuppressWarnings("unchecked") - CheckpointableInputFormat
checkpointableFormat = - (CheckpointableInputFormat ) this.format; - - S formatState = this.isSplitOpen ? - checkpointableFormat.getCurrentState() : - restoredFormatState; - return new Tuple3<>(snapshot, currentSplit, formatState); - } else { - LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery."); - return new Tuple3<>(snapshot, currentSplit, null); + private List getReaderState() throws IOException { + List snapshot = new ArrayList<>(this.pendingSplits.size()); + if (currentSplit != null ) { + if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { + S formatState = ((CheckpointableInputFormat ) this.format).getCurrentState(); --- End diff -- ```java Serializable formatState = ((CheckpointableInputFormat) this.format).getCurrentState(); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591955#comment-15591955 ] Anton Mushin commented on FLINK-4832: - Hello I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}} also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will be called if elements are in inputData. {code} TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN element : inputData) { IN inCopy = inSerializer.copy(element); OUT out = function.map(inCopy); result.add(outSerializer.copy(out)); } {code} And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited for examle as {code} override def initiate(partial: Row): Unit = { partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends SumAggregate[T] } {code} then next test will be passed {code} @Test def testDataSetAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1) FROM MyTable" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "231" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test def testSumNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0.0,0.0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84290827 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" --- End diff -- Sure, makes sense since those ports are not reachable by TaskManagers running in different containers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user sedgewickmm18 commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84291047 --- Diff: flink-contrib/docker-flink/docker-compose.sh --- @@ -0,0 +1,4 @@ +#!/bin/sh --- End diff -- that's fine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2628: [FLINK-3722] [runtime] Don't / and % when sorting
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2628 Thanks @ggevay for reviewing. I added a commit with additional comments. I transcribed `Quicksort` so as to remove considerations of Java performance and inlining. It was not clear to me that if we encapsulated the index, page number, and page offset into an object that Java would inline the various increment and decrement functions. Also, I don't think this looks too bad. I'm happy to reformat if that is preferred. I think this is the best time to investigate alternative methods. I'm not seeing how one would sort on top of `InMemorySorter` without deserializing records. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84279968 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -189,7 +186,7 @@ public void close() throws Exception { output.close(); } - private class SplitReader extends Thread { + private final class SplitReader extends Thread { --- End diff -- Making private classes final is not really necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84281147 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); --- End diff -- Is it really necessary to have this special split? Couldn't you just have a `reader.stop()` method which stops the reader after the current split has been processed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84284953 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RichFileInputSplitTest.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.checkpointing; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.source.RichFileInputSplit; +import org.junit.Assert; +import org.junit.Test; + +public class RichFileInputSplitTest { + + @Test + public void testSplitEquality() { + + RichFileInputSplit eos1 = RichFileInputSplit.EOS; + RichFileInputSplit eos2 = RichFileInputSplit.EOS; + + Assert.assertEquals(eos1, eos2); + + FileInputSplit firstSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, firstSplit); + Assert.assertNotEquals(eos1, richFirstSplit); + Assert.assertNotEquals(richFirstSplit, firstSplit); + + FileInputSplit secondSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, secondSplit); + Assert.assertEquals(richFirstSplit, richSecondSplit); + Assert.assertNotEquals(richFirstSplit, firstSplit); + + FileInputSplit modSecondSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + RichFileInputSplit richModSecondSplit = new RichFileInputSplit(11, modSecondSplit); + Assert.assertNotEquals(richSecondSplit, richModSecondSplit); + + FileInputSplit thirdSplit = new FileInputSplit(2, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, thirdSplit); + Assert.assertEquals(richThirdSplit.getModificationTime(), 10); + Assert.assertNotEquals(richFirstSplit, richThirdSplit); + + FileInputSplit thirdSplitCopy = new FileInputSplit(2, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richThirdSplitCopy = new RichFileInputSplit(10, thirdSplitCopy); + Assert.assertEquals(richThirdSplitCopy, richThirdSplit); + } + + @Test + public void testSplitComparison() { + FileInputSplit firstSplit = new FileInputSplit(3, new Path("test/test1"), 0, 100, null); + RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, firstSplit); + + FileInputSplit secondSplit = new FileInputSplit(2, new Path("test/test2"), 0, 100, null); + RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, secondSplit); + + FileInputSplit thirdSplit = new FileInputSplit(1, new Path("test/test2"), 0, 100, null); + RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, thirdSplit); + + FileInputSplit forthSplit = new FileInputSplit(0, new Path("test/test3"), 0, 100, null); + RichFileInputSplit richForthSplit = new RichFileInputSplit(11, forthSplit); + + // lexicographically on the path order + Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 0); + Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0); + + // same mod time, same file so smaller split number first + Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 0); + + // smaller modification time first + Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0); + } + + @Test + public void testIllegalArgument() { + try { + FileInputSplit firstSplit = new FileInputSplit(2, new Path("test"), 0, 100, null); + new
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288776 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); + + /** +* Creates a {@link RichFileInputSplit} based on the file modification time and +* the rest of the information of the {@link FileInputSplit}, as returned by the +* underlying filesystem. +* +* @param modificationTime the modification file of the file this split belongs to +* @param split the rest of the information about the split +*/ + public RichFileInputSplit(long modificationTime, FileInputSplit split) { + this(modificationTime, + split.getSplitNumber(), + split.getPath(), + split.getStart(), + split.getLength(), + split.getHostnames()); + } + + /** +* Constructor with the raw split information. +* +* @param modificationTime the modification file of the file this split belongs to +* @param numthe number of this input split +* @param file the file name +* @param start the position of the first byte in the file to process +* @param length the number of bytes in the file to process (-1 is flag for "read whole file") +* @param hosts the list of hosts containing the block, possibly null +*/ + private RichFileInputSplit(long modificationTime, int num, Path file, long start, long length, String[] hosts) { + super(num, file, start, length, hosts); + + Preconditions.checkArgument(modificationTime >= 0 || modificationTime == Long.MIN_VALUE, + "Invalid File Split Modification Time: "+ modificationTime +"."); + + this.modificationTime = modificationTime; + } + + /** +* Sets the state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* +* This is applicable to {@link org.apache.flink.api.common.io.FileInputFormat FileInputFormats} +* that implement the {@link org.apache.flink.api.common.io.CheckpointableInputFormat +* CheckpointableInputFormat} interface. +* */ + public void setSplitState(S state) { + this.splitState = state; + } + + /** +* Sets the state of the split to
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288567 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; --- End diff -- ```java private Serializable splitState; ``` should be sufficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84285924 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit --- End diff -- The name rich :) I'd be happy if we could find another name. Rich doesn't really mean anything. How about `TimestampedFileInputSplit`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84288480 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit --- End diff -- I think you can drop the type parameter here since you don't gain any type safety from the parameter. It is never used in any argument which would make it meaningful. Instead just use `Serializable` for the state type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84285533 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * + * The modification time of the file this split belongs to. + * When checkpointing, the state of the split at the moment of the checkpoint. + * + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class RichFileInputSplit + extends FileInputSplit implements Comparable{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** +* The state of the split. This information is used when +* restoring from a checkpoint and allows to resume reading the +* underlying file from the point we left off. +* */ + private S splitState; + + /** A special {@link RichFileInputSplit} signaling the end of the stream of splits.*/ + public static final RichFileInputSplit EOS = + new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, null); + + /** +* Creates a {@link RichFileInputSplit} based on the file modification time and +* the rest of the information of the {@link FileInputSplit}, as returned by the +* underlying filesystem. +* +* @param modificationTime the modification file of the file this split belongs to +* @param split the rest of the information about the split +*/ + public RichFileInputSplit(long modificationTime, FileInputSplit split) { --- End diff -- Not sure about this constructor. I think I'd prefer something spelling out the parameters. This also avoids to create a regular FileInputSplit every time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r84280372 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -199,44 +196,39 @@ public void close() throws Exception { private final Object checkpointLock; private final SourceFunction.SourceContext readerContext; - private final Queue pendingSplits; - - private FileInputSplit currentSplit = null; + private final QueuependingSplits; - private S restoredFormatState = null; + private RichFileInputSplit currentSplit; - private volatile boolean isSplitOpen = false; + private volatile boolean isSplitOpen; private SplitReader(FileInputFormat format, TypeSerializer serializer, SourceFunction.SourceContext readerContext, Object checkpointLock, - Tuple3 restoredState) { + List
restoredState) { this.format = checkNotNull(format, "Unspecified FileInputFormat."); this.serializer = checkNotNull(serializer, "Unspecified Serializer."); this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); - this.pendingSplits = new ArrayDeque<>(); this.isRunning = true; - // this is the case where a task recovers from a previous failed attempt - if (restoredState != null) { - List pending = restoredState.f0; - FileInputSplit current = restoredState.f1; - S formatState = restoredState.f2; - - for (FileInputSplit split : pending) { - pendingSplits.add(split); + this.pendingSplits = new PriorityQueue<>(100, new Comparator () { --- End diff -- Why did you choose 100 as the initial size? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591881#comment-15591881 ] ASF GitHub Bot commented on FLINK-3722: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2628 Thanks @ggevay for reviewing. I added a commit with additional comments. I transcribed `Quicksort` so as to remove considerations of Java performance and inlining. It was not clear to me that if we encapsulated the index, page number, and page offset into an object that Java would inline the various increment and decrement functions. Also, I don't think this looks too bad. I'm happy to reformat if that is preferred. I think this is the best time to investigate alternative methods. I'm not seeing how one would sort on top of `InMemorySorter` without deserializing records. > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user sedgewickmm18 commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84287069 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" --- End diff -- These lines expose taskmanager's RPC and data ports to make them accessible in the private subnet, please see https://docs.docker.com/docker-cloud/apps/ports/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user sedgewickmm18 commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84286390 --- Diff: flink-contrib/docker-flink/docker-entrypoint.sh --- @@ -20,11 +20,19 @@ if [ "$1" = "jobmanager" ]; then echo "Starting Job Manager" -sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml +#sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml + +# make use of container linking and exploit the jobmanager entry in /etc/hosts +sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml + sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" $FLINK_HOME/conf/flink-conf.yaml --- End diff -- agree - makes much more sense that way - otherwise it's always one slot by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242539 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" --- End diff -- What are these ports needed for? The TaskManager will always initiate the connection to the JobManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-3902) Discarded FileSystem checkpoints are lingering around
[ https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591808#comment-15591808 ] Jan Zajíc edited comment on FLINK-3902 at 10/20/16 1:24 PM: WE have same issue on Linux in docker, but with *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. was (Author: jan-zajic): WE have same issue on Linux in docker, but *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. > Discarded FileSystem checkpoints are lingering around > - > > Key: FLINK-3902 > URL: https://issues.apache.org/jira/browse/FLINK-3902 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Ufuk Celebi > > A user reported that checkpoints with {{FSStateBackend}} are not properly > cleaned up. > {code} > 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: > blk_1084791727_11053122 10.10.113.10:50010 > 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler > 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from > 10.10.113.9:49233 Call#12337 Retry#0 > org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non > empty': Directory is not empty > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) > {code} > {code} > 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 62 @ 1462875622636 > 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 62 (in 9843 ms) > 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 63 @ 1462875652637 > 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 63 (in 13909 ms) > 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 64 @ 1462875682636 > {code} > Running the same program with the {{RocksDBBackend}} works as expected and > clears the old checkpoints properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3902) Discarded FileSystem checkpoints are lingering around
[ https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591808#comment-15591808 ] Jan Zajíc edited comment on FLINK-3902 at 10/20/16 1:24 PM: We have same issue on Linux in docker, but with *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. was (Author: jan-zajic): WE have same issue on Linux in docker, but with *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. > Discarded FileSystem checkpoints are lingering around > - > > Key: FLINK-3902 > URL: https://issues.apache.org/jira/browse/FLINK-3902 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Ufuk Celebi > > A user reported that checkpoints with {{FSStateBackend}} are not properly > cleaned up. > {code} > 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: > blk_1084791727_11053122 10.10.113.10:50010 > 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler > 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from > 10.10.113.9:49233 Call#12337 Retry#0 > org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non > empty': Directory is not empty > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) > {code} > {code} > 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 62 @ 1462875622636 > 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 62 (in 9843 ms) > 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 63 @ 1462875652637 > 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 63 (in 13909 ms) > 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 64 @ 1462875682636 > {code} > Running the same program with the {{RocksDBBackend}} works as expected and > clears the old checkpoints properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3902) Discarded FileSystem checkpoints are lingering around
[ https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591808#comment-15591808 ] Jan Zajíc commented on FLINK-3902: -- WE have same issue on Linux in docker, but *both* ! RocksDBBackend and FSStateBackend. On Windows developer local machine it works just fine. > Discarded FileSystem checkpoints are lingering around > - > > Key: FLINK-3902 > URL: https://issues.apache.org/jira/browse/FLINK-3902 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Ufuk Celebi > > A user reported that checkpoints with {{FSStateBackend}} are not properly > cleaned up. > {code} > 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: > blk_1084791727_11053122 10.10.113.10:50010 > 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler > 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from > 10.10.113.9:49233 Call#12337 Retry#0 > org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non > empty': Directory is not empty > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) > {code} > {code} > 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 62 @ 1462875622636 > 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 62 (in 9843 ms) > 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 63 @ 1462875652637 > 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 63 (in 13909 ms) > 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 64 @ 1462875682636 > {code} > Running the same program with the {{RocksDBBackend}} works as expected and > clears the old checkpoints properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84255151 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java --- @@ -35,10 +34,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. */ -public class DefaultTimeServiceProvider extends TimeServiceProvider { +public class DefaultProcessingTimeService extends ProcessingTimeService { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2628: [FLINK-3722] [runtime] Don't / and % when sorting
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2628#discussion_r84248344 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java --- @@ -49,20 +49,40 @@ protected static int getMaxDepth(int x) { * then switch to {@link HeapSort}. */ public void sort(final IndexedSortable s, int p, int r) { - sortInternal(s, p, r, getMaxDepth(r - p)); + int recordsPerSegment = s.recordsPerSegment(); + int recordSize = s.recordSize(); + + int maxOffset = recordSize * (recordsPerSegment - 1); + + int size = s.size(); + int sizeN = size / recordsPerSegment; + int sizeO = (size % recordsPerSegment) * recordSize; + + sortInternal(s, recordsPerSegment, recordSize, maxOffset, 0, 0, 0, size, sizeN, sizeO, getMaxDepth(r - p)); } public void sort(IndexedSortable s) { sort(s, 0, s.size()); } - private static void sortInternal(final IndexedSortable s, int p, int r, int depth) { + private static void sortInternal(final IndexedSortable s, int recordsPerSegment, int recordSize, int maxOffset, + int p, int pN, int pO, int r, int rN, int rO, int depth) { --- End diff -- Could you please add a comment that explains all these parameters? (I understand them only because I know the original code and also what you are trying to achieve, but for someone who sees the code for the first time this will be quite scary.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2618 Thanks a lot @mxm ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2618 Thanks for updating the description. Let take a look at the changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2657: [FLINK-4853] [rm] Harden job manager registration at the ...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2657 This doesn't compile currently. Do you prefer if I review the PRs individually or review the commits in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591741#comment-15591741 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2657 This doesn't compile currently. Do you prefer if I review the PRs individually or review the commits in this PR? > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the {{ResourceManager}} blocks > threads in the {{RpcService.execute}} pool. This is not ideal and can be > avoided by not waiting on a {{Future}} in this call. > I propose to encapsulate the leader id retrieval operation in a distinct > service so that it can be separated from the {{ResourceManager}}. This will > reduce the complexity of the {{ResourceManager}} and make the individual > components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance
[ https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591734#comment-15591734 ] ASF GitHub Bot commented on FLINK-3722: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2628#discussion_r84248344 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java --- @@ -49,20 +49,40 @@ protected static int getMaxDepth(int x) { * then switch to {@link HeapSort}. */ public void sort(final IndexedSortable s, int p, int r) { - sortInternal(s, p, r, getMaxDepth(r - p)); + int recordsPerSegment = s.recordsPerSegment(); + int recordSize = s.recordSize(); + + int maxOffset = recordSize * (recordsPerSegment - 1); + + int size = s.size(); + int sizeN = size / recordsPerSegment; + int sizeO = (size % recordsPerSegment) * recordSize; + + sortInternal(s, recordsPerSegment, recordSize, maxOffset, 0, 0, 0, size, sizeN, sizeO, getMaxDepth(r - p)); } public void sort(IndexedSortable s) { sort(s, 0, s.size()); } - private static void sortInternal(final IndexedSortable s, int p, int r, int depth) { + private static void sortInternal(final IndexedSortable s, int recordsPerSegment, int recordSize, int maxOffset, + int p, int pN, int pO, int r, int rN, int rO, int depth) { --- End diff -- Could you please add a comment that explains all these parameters? (I understand them only because I know the original code and also what you are trying to achieve, but for someone who sees the code for the first time this will be quite scary.) > The divisions in the InMemorySorters' swap/compare methods hurt performance > --- > > Key: FLINK-3722 > URL: https://issues.apache.org/jira/browse/FLINK-3722 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Greg Hogan >Priority: Minor > Labels: performance > > NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods > use divisions (which take a lot of time \[1\]) to calculate the index of the > MemorySegment and the offset inside the segment. [~greghogan] reported on the > mailing list \[2\] measuring a ~12-14% performance effect in one case. > A possibility to improve the situation is the following: > The way that QuickSort mostly uses these compare and swap methods is that it > maintains two indices, and uses them to call compare and swap. The key > observation is that these indices are mostly stepped by one, and > _incrementally_ calculating the quotient and modulo is actually easy when the > index changes only by one: increment/decrement the modulo, and check whether > the modulo has reached 0 or the divisor, and if it did, then wrap-around the > modulo and increment/decrement the quotient. > To implement this, InMemorySorter would have to expose an iterator that would > have the divisor and the current modulo and quotient as state, and have > increment/decrement methods. Compare and swap could then have overloads that > take these iterators as arguments. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf > \[2\] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation
Aljoscha Krettek created FLINK-4866: --- Summary: Make Trigger.clear() Abstract to Enforce Implementation Key: FLINK-4866 URL: https://issues.apache.org/jira/browse/FLINK-4866 Project: Flink Issue Type: Bug Components: Streaming Reporter: Aljoscha Krettek If the method is not abstract implementors of custom triggers will not realise that it could be necessary and they will likely not clean up their state/timers properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector
[ https://issues.apache.org/jira/browse/FLINK-4865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4865: --- Description: We can only call "evaluate" method on a DataSet[(Vector,Double)]. Eg: If our model is an SVM svm.evaluate(test) with test has type DataSet[(Vector,Double)] We would like to call it on DataSet[LabeledVector] also. was: We can only call "evaluate" method on a DataSet[(Double,Vector)]. Eg: If our model is an SVM svm.evaluate(test) with test has type DataSet[(Double,Vector)] We would like to call it on DataSet[LabeledVector] also. > FlinkML - Add EvaluateDataSet operation for LabeledVector > - > > Key: FLINK-4865 > URL: https://issues.apache.org/jira/browse/FLINK-4865 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > We can only call "evaluate" method on a DataSet[(Vector,Double)]. > Eg: If our model is an SVM > svm.evaluate(test) with test has type DataSet[(Vector,Double)] > We would like to call it on DataSet[LabeledVector] also. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector
[ https://issues.apache.org/jira/browse/FLINK-4865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4865: --- Description: We can only call "evaluate" method on a DataSet[(Double,Vector)]. Eg: If our model is an SVM svm.evaluate(test) with test has type DataSet[(Double,Vector)] We would like to call it on DataSet[LabeledVector] also. was: We can only call "evaluate" method on a DataSet[(Double,Vector)]. Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)] We want also to call this method on DataSet[LabeledVector] > FlinkML - Add EvaluateDataSet operation for LabeledVector > - > > Key: FLINK-4865 > URL: https://issues.apache.org/jira/browse/FLINK-4865 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > We can only call "evaluate" method on a DataSet[(Double,Vector)]. > Eg: If our model is an SVM > svm.evaluate(test) with test has type DataSet[(Double,Vector)] > We would like to call it on DataSet[LabeledVector] also. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector
Thomas FOURNIER created FLINK-4865: -- Summary: FlinkML - Add EvaluateDataSet operation for LabeledVector Key: FLINK-4865 URL: https://issues.apache.org/jira/browse/FLINK-4865 Project: Flink Issue Type: New Feature Reporter: Thomas FOURNIER Priority: Minor We can only call "evaluate" method on a DataSet[(Double,Vector)]. Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)] We want also to call this method on DataSet[LabeledVector] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242735 --- Diff: flink-contrib/docker-flink/docker-compose.sh --- @@ -0,0 +1,4 @@ +#!/bin/sh --- End diff -- Could we name this file `bluemix-docker-compose.sh`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2667#discussion_r84242416 --- Diff: flink-contrib/docker-flink/docker-compose.yml --- @@ -20,16 +20,20 @@ version: "2" services: jobmanager: image: flink +container_name: "jobmanager" +expose: + - "6123" ports: - "48081:8081" command: jobmanager -volumes: - - /opt/flink/conf taskmanager: image: flink +expose: + - "6121" + - "6122" depends_on: - jobmanager command: taskmanager -volumes_from: - - jobmanager:ro +links: --- End diff -- `links` are now a legacy feature of Docker 1.9.0 but probably fine to stick with it for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4864) Shade Calcite dependency in flink-table
Fabian Hueske created FLINK-4864: Summary: Shade Calcite dependency in flink-table Key: FLINK-4864 URL: https://issues.apache.org/jira/browse/FLINK-4864 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.2.0 Reporter: Fabian Hueske The Table API has a dependency on Apache Calcite. A user reported to have version conflicts when having a own Calcite dependency in the classpath. The solution would be to shade away the Calcite dependency (Calcite's transitive dependencies are already shaded). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591644#comment-15591644 ] Thomas FOURNIER commented on FLINK-4850: Ok I'm creating a specific JIRA issue related to adding EvaluateDataSet operation for LabeledVector. > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER >Assignee: Theodore Vasiloudis > > It seems that evaluate operation is defined for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML when using SVM. > We need to update the documentation as follows: > val astroTest:DataSet[(Vector,Double)] = MLUtils > .readLibSVM(env, "src/main/resources/svmguide1.t") > .map(l => (l.vector, l.label)) > val predictionPairs = svm.evaluate(astroTest) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4850: --- Description: It seems that evaluate operation is defined for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML when using SVM. We need to update the documentation as follows: val astroTest:DataSet[(Vector,Double)] = MLUtils .readLibSVM(env, "src/main/resources/svmguide1.t") .map(l => (l.vector, l.label)) val predictionPairs = svm.evaluate(astroTest) was: It seems that evaluate operation is defined for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML when using SVM. 1- We need to update the documentation as follows: val astroTest:DataSet[(Vector,Double)] = MLUtils .readLibSVM(env, "src/main/resources/svmguide1.t") .map(l => (l.vector, l.label)) val predictionPairs = svm.evaluate(astroTest) 2- Update code such that LabeledVector can be used with evaluate method > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER >Assignee: Theodore Vasiloudis > > It seems that evaluate operation is defined for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML when using SVM. > We need to update the documentation as follows: > val astroTest:DataSet[(Vector,Double)] = MLUtils > .readLibSVM(env, "src/main/resources/svmguide1.t") > .map(l => (l.vector, l.label)) > val predictionPairs = svm.evaluate(astroTest) -- This message was sent by Atlassian JIRA (v6.3.4#6332)